From 8c8fb185cdeceb50617f970a4119ac44cb7858ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C3=ABl=20Lemaire?= Date: Sun, 7 Jul 2013 00:30:14 +0200 Subject: [PATCH] Fixed parallel queue issues --- src/rendering/tools/parallel.c | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/rendering/tools/parallel.c b/src/rendering/tools/parallel.c index ca87b58..a06296a 100644 --- a/src/rendering/tools/parallel.c +++ b/src/rendering/tools/parallel.c @@ -142,7 +142,7 @@ int parallelWorkPerform(ParallelWork* work, int workers) } -#define QUEUE_SIZE 200 +#define QUEUE_SIZE 1000 typedef enum { @@ -188,7 +188,7 @@ static void* _queueThreadCallback(ParallelQueue* queue) job = queue->jobs + queue->jobs_index_pending; if (job->state == JOB_STATE_PENDING) { - if (queue->jobs_index_pending == QUEUE_SIZE - 1) + if (queue->jobs_index_pending >= QUEUE_SIZE - 1) { queue->jobs_index_pending = 0; } @@ -218,11 +218,14 @@ static void* _queueThreadCallback(ParallelQueue* queue) else { job->state = JOB_STATE_FREE; + queue->jobs_count--; } mutexRelease(queue->lock); } - - timeSleepMs(50); + else + { + timeSleepMs(50); + } } return NULL; } @@ -239,6 +242,18 @@ ParallelQueue* parallelQueueCreate(int collect) queue->stopping = 0; queue->lock = mutexCreate(); + queue->jobs = malloc(sizeof(ParallelJob) * QUEUE_SIZE); + for (i = 0; i < QUEUE_SIZE; i++) + { + queue->jobs[i].state = JOB_STATE_FREE; + } + queue->jobs_count = 0; + queue->jobs_index_free = 0; + queue->jobs_index_collect = 0; + queue->jobs_index_pending = 0; + queue->jobs_next_id = 1; + + /* Start workers */ queue->workers_count = systemGetCoreCount(); queue->workers = malloc(sizeof(Thread*) * queue->workers_count); for (i = 0; i < queue->workers_count; i++) @@ -246,13 +261,6 @@ ParallelQueue* parallelQueueCreate(int collect) queue->workers[i] = threadCreate((ThreadFunction)_queueThreadCallback, queue); } - queue->jobs = malloc(sizeof(ParallelJob) * QUEUE_SIZE); - queue->jobs_count = 0; - queue->jobs_index_free = 0; - queue->jobs_index_collect = 0; - queue->jobs_index_pending = 0; - queue->jobs_next_id = 1; - return queue; } @@ -261,6 +269,7 @@ void parallelQueueDelete(ParallelQueue* queue) parallelQueueInterrupt(queue); assert(!queue->collect || queue->jobs[queue->jobs_index_collect].state != JOB_STATE_TOCOLLECT); + assert(queue->jobs_count == 0); mutexDestroy(queue->lock); free(queue->jobs); @@ -315,7 +324,7 @@ int parallelQueueAddJob(ParallelQueue* queue, FuncParallelJob func_process, void return 0; } queue->jobs[queue->jobs_index_free] = job; - if (queue->jobs_index_free == QUEUE_SIZE - 1) + if (queue->jobs_index_free >= QUEUE_SIZE - 1) { queue->jobs_index_free = 0; }