Fixed parallel queue issues

This commit is contained in:
Michaël Lemaire 2013-07-07 00:30:14 +02:00
parent ea24656b20
commit 8c8fb185cd

View file

@ -142,7 +142,7 @@ int parallelWorkPerform(ParallelWork* work, int workers)
} }
#define QUEUE_SIZE 200 #define QUEUE_SIZE 1000
typedef enum typedef enum
{ {
@ -188,7 +188,7 @@ static void* _queueThreadCallback(ParallelQueue* queue)
job = queue->jobs + queue->jobs_index_pending; job = queue->jobs + queue->jobs_index_pending;
if (job->state == JOB_STATE_PENDING) if (job->state == JOB_STATE_PENDING)
{ {
if (queue->jobs_index_pending == QUEUE_SIZE - 1) if (queue->jobs_index_pending >= QUEUE_SIZE - 1)
{ {
queue->jobs_index_pending = 0; queue->jobs_index_pending = 0;
} }
@ -218,11 +218,14 @@ static void* _queueThreadCallback(ParallelQueue* queue)
else else
{ {
job->state = JOB_STATE_FREE; job->state = JOB_STATE_FREE;
queue->jobs_count--;
} }
mutexRelease(queue->lock); mutexRelease(queue->lock);
} }
else
timeSleepMs(50); {
timeSleepMs(50);
}
} }
return NULL; return NULL;
} }
@ -239,6 +242,18 @@ ParallelQueue* parallelQueueCreate(int collect)
queue->stopping = 0; queue->stopping = 0;
queue->lock = mutexCreate(); queue->lock = mutexCreate();
queue->jobs = malloc(sizeof(ParallelJob) * QUEUE_SIZE);
for (i = 0; i < QUEUE_SIZE; i++)
{
queue->jobs[i].state = JOB_STATE_FREE;
}
queue->jobs_count = 0;
queue->jobs_index_free = 0;
queue->jobs_index_collect = 0;
queue->jobs_index_pending = 0;
queue->jobs_next_id = 1;
/* Start workers */
queue->workers_count = systemGetCoreCount(); queue->workers_count = systemGetCoreCount();
queue->workers = malloc(sizeof(Thread*) * queue->workers_count); queue->workers = malloc(sizeof(Thread*) * queue->workers_count);
for (i = 0; i < queue->workers_count; i++) for (i = 0; i < queue->workers_count; i++)
@ -246,13 +261,6 @@ ParallelQueue* parallelQueueCreate(int collect)
queue->workers[i] = threadCreate((ThreadFunction)_queueThreadCallback, queue); queue->workers[i] = threadCreate((ThreadFunction)_queueThreadCallback, queue);
} }
queue->jobs = malloc(sizeof(ParallelJob) * QUEUE_SIZE);
queue->jobs_count = 0;
queue->jobs_index_free = 0;
queue->jobs_index_collect = 0;
queue->jobs_index_pending = 0;
queue->jobs_next_id = 1;
return queue; return queue;
} }
@ -261,6 +269,7 @@ void parallelQueueDelete(ParallelQueue* queue)
parallelQueueInterrupt(queue); parallelQueueInterrupt(queue);
assert(!queue->collect || queue->jobs[queue->jobs_index_collect].state != JOB_STATE_TOCOLLECT); assert(!queue->collect || queue->jobs[queue->jobs_index_collect].state != JOB_STATE_TOCOLLECT);
assert(queue->jobs_count == 0);
mutexDestroy(queue->lock); mutexDestroy(queue->lock);
free(queue->jobs); free(queue->jobs);
@ -315,7 +324,7 @@ int parallelQueueAddJob(ParallelQueue* queue, FuncParallelJob func_process, void
return 0; return 0;
} }
queue->jobs[queue->jobs_index_free] = job; queue->jobs[queue->jobs_index_free] = job;
if (queue->jobs_index_free == QUEUE_SIZE - 1) if (queue->jobs_index_free >= QUEUE_SIZE - 1)
{ {
queue->jobs_index_free = 0; queue->jobs_index_free = 0;
} }