From ea24656b20268b80662c7f5fbc38360d89f1b814 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C3=ABl=20Lemaire?= Date: Thu, 4 Jul 2013 17:36:41 +0200 Subject: [PATCH] raster_threading: Added ParallelQueue system as helper It will be used to push tessellated quads to render area. --- src/rendering/tools/parallel.c | 195 +++++++++++++++++++++++++++++++++ src/rendering/tools/parallel.h | 79 ++++++++++++- 2 files changed, 271 insertions(+), 3 deletions(-) diff --git a/src/rendering/tools/parallel.c b/src/rendering/tools/parallel.c index 73fddc1..ca87b58 100644 --- a/src/rendering/tools/parallel.c +++ b/src/rendering/tools/parallel.c @@ -140,3 +140,198 @@ int parallelWorkPerform(ParallelWork* work, int workers) work->running = 0; return result; } + + +#define QUEUE_SIZE 200 + +typedef enum +{ + JOB_STATE_FREE, + JOB_STATE_PENDING, + JOB_STATE_PROCESSING, + JOB_STATE_TOCOLLECT +} EnumJobState; + +typedef struct +{ + EnumJobState state; + int id; + FuncParallelJob process; + void* data; +} ParallelJob; + +struct ParallelQueue +{ + int collect; + volatile int stopping; + Mutex* lock; + + int workers_count; + Thread** workers; + + ParallelJob* jobs; + int jobs_count; /** Number of jobs in queue (all status except JOB_STATE_FREE) */ + int jobs_index_free; /** Index of next free position */ + int jobs_index_collect; /** Index of first job to collect */ + int jobs_index_pending; /** Index of first pending job to process */ + int jobs_next_id; +}; + +static void* _queueThreadCallback(ParallelQueue* queue) +{ + ParallelJob* job; + + while (!queue->stopping) + { + /* Try to take a job */ + mutexAcquire(queue->lock); + job = queue->jobs + queue->jobs_index_pending; + if (job->state == JOB_STATE_PENDING) + { + if (queue->jobs_index_pending == QUEUE_SIZE - 1) + { + queue->jobs_index_pending = 0; + } + else + { + queue->jobs_index_pending++; + } + job->state = JOB_STATE_PROCESSING; + } + else + { + job = NULL; + } + mutexRelease(queue->lock); + + if (job) + { + /* Process the job */ + job->process(queue, job->id, job->data, 0); + + mutexAcquire(queue->lock); + if (queue->collect) + { + job->state = JOB_STATE_TOCOLLECT; + /* TODO jobs_index_collect ? */ + } + else + { + job->state = JOB_STATE_FREE; + } + mutexRelease(queue->lock); + } + + timeSleepMs(50); + } + return NULL; +} + +ParallelQueue* parallelQueueCreate(int collect) +{ + int i; + + assert(!collect); /* Not fully implemented yet ! */ + + ParallelQueue* queue = malloc(sizeof(ParallelQueue)); + + queue->collect = collect; + queue->stopping = 0; + queue->lock = mutexCreate(); + + queue->workers_count = systemGetCoreCount(); + queue->workers = malloc(sizeof(Thread*) * queue->workers_count); + for (i = 0; i < queue->workers_count; i++) + { + queue->workers[i] = threadCreate((ThreadFunction)_queueThreadCallback, queue); + } + + queue->jobs = malloc(sizeof(ParallelJob) * QUEUE_SIZE); + queue->jobs_count = 0; + queue->jobs_index_free = 0; + queue->jobs_index_collect = 0; + queue->jobs_index_pending = 0; + queue->jobs_next_id = 1; + + return queue; +} + +void parallelQueueDelete(ParallelQueue* queue) +{ + parallelQueueInterrupt(queue); + + assert(!queue->collect || queue->jobs[queue->jobs_index_collect].state != JOB_STATE_TOCOLLECT); + + mutexDestroy(queue->lock); + free(queue->jobs); + free(queue->workers); + free(queue); +} + +void parallelQueueInterrupt(ParallelQueue* queue) +{ + int i; + + if (!queue->stopping) + { + queue->stopping = 1; + + for (i = 0; i < queue->workers_count; i++) + { + threadJoin(queue->workers[i]); + } + } +} + +int parallelQueueAddJob(ParallelQueue* queue, FuncParallelJob func_process, void* data) +{ + if (queue->stopping) + { + return 0; + } + + /* Wait for a free slot */ + while (queue->jobs[queue->jobs_index_free].state != JOB_STATE_FREE) + { + timeSleepMs(50); + if (queue->stopping) + { + return 0; + } + } + + /* Prepare the job */ + ParallelJob job; + job.state = JOB_STATE_PENDING; + job.id = queue->jobs_next_id++; + job.process = func_process; + job.data = data; + + /* Add the job to the queue */ + mutexAcquire(queue->lock); + if (queue->stopping) + { + mutexRelease(queue->lock); + return 0; + } + queue->jobs[queue->jobs_index_free] = job; + if (queue->jobs_index_free == QUEUE_SIZE - 1) + { + queue->jobs_index_free = 0; + } + else + { + queue->jobs_index_free++; + } + queue->jobs_count++; + assert(queue->jobs_count <= QUEUE_SIZE); + mutexRelease(queue->lock); + + return job.id; +} + +int parallelQueueCollectJobs(FuncParallelJob func_collect) +{ + /* TODO */ + return 0; +} diff --git a/src/rendering/tools/parallel.h b/src/rendering/tools/parallel.h index 6c5fca1..98613e7 100644 --- a/src/rendering/tools/parallel.h +++ b/src/rendering/tools/parallel.h @@ -16,13 +16,86 @@ extern "C" { typedef struct ParallelWork ParallelWork; typedef int (*ParallelUnitFunction)(ParallelWork* work, int unit, void* data); -/*void parallelInit(); -void parallelQuit();*/ - +/** + * Create a parallel work handler. + * + * This will spawn an optimal number of threads to process a given number of work units. + * @param func The callback that will be called from threads to process one unit. + * @param units Number of units to handle. + * @param data Custom data that will be passed to the callback. + * @return The newly allocated handler. + */ ParallelWork* parallelWorkCreate(ParallelUnitFunction func, int units, void* data); + +/** + * Delete a parallel work handler. + * + * The work must be terminated or fully interrupted before calling this. + * @param work The handler to free. + */ void parallelWorkDelete(ParallelWork* work); + +/** + * Start working on the units. + * + * @param work The handler. + * @param workers Number of threads to spaws, -1 for an optimal number. + */ int parallelWorkPerform(ParallelWork* work, int workers); + + +typedef struct ParallelQueue ParallelQueue; +typedef int (*FuncParallelJob)(ParallelQueue* queue, int job_id, void* data, int stopping); + +/** + * Create a parallel processing queue. + * + * This queue will use parallel workers to process jobs added to it. + * @param collect True to collect finished jobs and wait for a call to parallelQueueCollectJobs, False to discard finished jobs. + * @return The newly allocated queue. + */ +ParallelQueue* parallelQueueCreate(int collect); + +/** + * Delete a parallel queue. + * + * This will interrupt the queue. + * If the queue is in collect mode, you should call parallelQueueInterrupt, then parallelQueueCollectJobs, before calling this. + * @param queue The queue to free. + */ +void parallelQueueDelete(ParallelQueue* queue); + +/** + * Interrupt the queue processing. + * + * This will wait for running jobs to end, cancel pending jobs (still calling their callbacks with stopping=1) and + * refuse future jobs. + * @param queue The queue to interrupt. + */ +void parallelQueueInterrupt(ParallelQueue* queue); + +/** + * Add a job to the queue. + * + * Don't call this method concurrently from several threads. + * @param queue The queue. + * @param func_process The function that will be called for the job processing. + * @param data The data that will be passed to the callback. + * @return The job ID, 0 if the queue doesn't accept jobs. + */ +int parallelQueueAddJob(ParallelQueue* queue, FuncParallelJob func_process, void* data); + +/** + * Collect finished jobs. + * + * The callback func_collect will be called sequentially for each finished job, from the caller thread (not parallel threads). + * Don't call this method concurrently from several threads. + * @param func_collect The callback for collect. + * @return The number of collected jobs. + */ +int parallelQueueCollectJobs(FuncParallelJob func_collect); + #ifdef __cplusplus } #endif