raster_threading: Added ParallelQueue system as helper

It will be used to push tessellated quads to render area.
This commit is contained in:
Michaël Lemaire 2013-07-04 17:36:41 +02:00
parent e90c8be5a4
commit ea24656b20
2 changed files with 271 additions and 3 deletions

View file

@ -140,3 +140,198 @@ int parallelWorkPerform(ParallelWork* work, int workers)
work->running = 0; work->running = 0;
return result; return result;
} }
#define QUEUE_SIZE 200
typedef enum
{
JOB_STATE_FREE,
JOB_STATE_PENDING,
JOB_STATE_PROCESSING,
JOB_STATE_TOCOLLECT
} EnumJobState;
typedef struct
{
EnumJobState state;
int id;
FuncParallelJob process;
void* data;
} ParallelJob;
struct ParallelQueue
{
int collect;
volatile int stopping;
Mutex* lock;
int workers_count;
Thread** workers;
ParallelJob* jobs;
int jobs_count; /** Number of jobs in queue (all status except JOB_STATE_FREE) */
int jobs_index_free; /** Index of next free position */
int jobs_index_collect; /** Index of first job to collect */
int jobs_index_pending; /** Index of first pending job to process */
int jobs_next_id;
};
static void* _queueThreadCallback(ParallelQueue* queue)
{
ParallelJob* job;
while (!queue->stopping)
{
/* Try to take a job */
mutexAcquire(queue->lock);
job = queue->jobs + queue->jobs_index_pending;
if (job->state == JOB_STATE_PENDING)
{
if (queue->jobs_index_pending == QUEUE_SIZE - 1)
{
queue->jobs_index_pending = 0;
}
else
{
queue->jobs_index_pending++;
}
job->state = JOB_STATE_PROCESSING;
}
else
{
job = NULL;
}
mutexRelease(queue->lock);
if (job)
{
/* Process the job */
job->process(queue, job->id, job->data, 0);
mutexAcquire(queue->lock);
if (queue->collect)
{
job->state = JOB_STATE_TOCOLLECT;
/* TODO jobs_index_collect ? */
}
else
{
job->state = JOB_STATE_FREE;
}
mutexRelease(queue->lock);
}
timeSleepMs(50);
}
return NULL;
}
ParallelQueue* parallelQueueCreate(int collect)
{
int i;
assert(!collect); /* Not fully implemented yet ! */
ParallelQueue* queue = malloc(sizeof(ParallelQueue));
queue->collect = collect;
queue->stopping = 0;
queue->lock = mutexCreate();
queue->workers_count = systemGetCoreCount();
queue->workers = malloc(sizeof(Thread*) * queue->workers_count);
for (i = 0; i < queue->workers_count; i++)
{
queue->workers[i] = threadCreate((ThreadFunction)_queueThreadCallback, queue);
}
queue->jobs = malloc(sizeof(ParallelJob) * QUEUE_SIZE);
queue->jobs_count = 0;
queue->jobs_index_free = 0;
queue->jobs_index_collect = 0;
queue->jobs_index_pending = 0;
queue->jobs_next_id = 1;
return queue;
}
void parallelQueueDelete(ParallelQueue* queue)
{
parallelQueueInterrupt(queue);
assert(!queue->collect || queue->jobs[queue->jobs_index_collect].state != JOB_STATE_TOCOLLECT);
mutexDestroy(queue->lock);
free(queue->jobs);
free(queue->workers);
free(queue);
}
void parallelQueueInterrupt(ParallelQueue* queue)
{
int i;
if (!queue->stopping)
{
queue->stopping = 1;
for (i = 0; i < queue->workers_count; i++)
{
threadJoin(queue->workers[i]);
}
}
}
int parallelQueueAddJob(ParallelQueue* queue, FuncParallelJob func_process, void* data)
{
if (queue->stopping)
{
return 0;
}
/* Wait for a free slot */
while (queue->jobs[queue->jobs_index_free].state != JOB_STATE_FREE)
{
timeSleepMs(50);
if (queue->stopping)
{
return 0;
}
}
/* Prepare the job */
ParallelJob job;
job.state = JOB_STATE_PENDING;
job.id = queue->jobs_next_id++;
job.process = func_process;
job.data = data;
/* Add the job to the queue */
mutexAcquire(queue->lock);
if (queue->stopping)
{
mutexRelease(queue->lock);
return 0;
}
queue->jobs[queue->jobs_index_free] = job;
if (queue->jobs_index_free == QUEUE_SIZE - 1)
{
queue->jobs_index_free = 0;
}
else
{
queue->jobs_index_free++;
}
queue->jobs_count++;
assert(queue->jobs_count <= QUEUE_SIZE);
mutexRelease(queue->lock);
return job.id;
}
int parallelQueueCollectJobs(FuncParallelJob func_collect)
{
/* TODO */
return 0;
}

View file

@ -16,13 +16,86 @@ extern "C" {
typedef struct ParallelWork ParallelWork; typedef struct ParallelWork ParallelWork;
typedef int (*ParallelUnitFunction)(ParallelWork* work, int unit, void* data); typedef int (*ParallelUnitFunction)(ParallelWork* work, int unit, void* data);
/*void parallelInit(); /**
void parallelQuit();*/ * Create a parallel work handler.
*
* This will spawn an optimal number of threads to process a given number of work units.
* @param func The callback that will be called from threads to process one unit.
* @param units Number of units to handle.
* @param data Custom data that will be passed to the callback.
* @return The newly allocated handler.
*/
ParallelWork* parallelWorkCreate(ParallelUnitFunction func, int units, void* data); ParallelWork* parallelWorkCreate(ParallelUnitFunction func, int units, void* data);
/**
* Delete a parallel work handler.
*
* The work must be terminated or fully interrupted before calling this.
* @param work The handler to free.
*/
void parallelWorkDelete(ParallelWork* work); void parallelWorkDelete(ParallelWork* work);
/**
* Start working on the units.
*
* @param work The handler.
* @param workers Number of threads to spaws, -1 for an optimal number.
*/
int parallelWorkPerform(ParallelWork* work, int workers); int parallelWorkPerform(ParallelWork* work, int workers);
typedef struct ParallelQueue ParallelQueue;
typedef int (*FuncParallelJob)(ParallelQueue* queue, int job_id, void* data, int stopping);
/**
* Create a parallel processing queue.
*
* This queue will use parallel workers to process jobs added to it.
* @param collect True to collect finished jobs and wait for a call to parallelQueueCollectJobs, False to discard finished jobs.
* @return The newly allocated queue.
*/
ParallelQueue* parallelQueueCreate(int collect);
/**
* Delete a parallel queue.
*
* This will interrupt the queue.
* If the queue is in collect mode, you should call parallelQueueInterrupt, then parallelQueueCollectJobs, before calling this.
* @param queue The queue to free.
*/
void parallelQueueDelete(ParallelQueue* queue);
/**
* Interrupt the queue processing.
*
* This will wait for running jobs to end, cancel pending jobs (still calling their callbacks with stopping=1) and
* refuse future jobs.
* @param queue The queue to interrupt.
*/
void parallelQueueInterrupt(ParallelQueue* queue);
/**
* Add a job to the queue.
*
* Don't call this method concurrently from several threads.
* @param queue The queue.
* @param func_process The function that will be called for the job processing.
* @param data The data that will be passed to the callback.
* @return The job ID, 0 if the queue doesn't accept jobs.
*/
int parallelQueueAddJob(ParallelQueue* queue, FuncParallelJob func_process, void* data);
/**
* Collect finished jobs.
*
* The callback func_collect will be called sequentially for each finished job, from the caller thread (not parallel threads).
* Don't call this method concurrently from several threads.
* @param func_collect The callback for collect.
* @return The number of collected jobs.
*/
int parallelQueueCollectJobs(FuncParallelJob func_collect);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif