paysages3d/src/system/ParallelWork.cpp

188 lines
3.9 KiB
C++
Raw Normal View History

#include "ParallelWork.h"
#include "Thread.h"
#include "System.h"
#include "Semaphore.h"
#include "Mutex.h"
2014-08-18 10:17:16 +00:00
#include "ParallelWorker.h"
#include <cassert>
2014-08-18 10:17:16 +00:00
/**
* Compatibility class for code that uses ParallelUnitFunction.
*/
class ParallelWorkerCompat:public ParallelWorker
{
public:
ParallelWorkerCompat(ParallelWork *work, ParallelWork::ParallelUnitFunction func, void* data):
ParallelWorker(), work(work), func(func), data(data)
{
}
virtual int processParallelUnit(int unit)
{
return func(work, unit, data);
}
private:
ParallelWork* work;
ParallelWork::ParallelUnitFunction func;
void* data;
};
/**
* Thread sub-class to perform units of work
*/
class ParallelWork::ParallelThread:public Thread
{
public:
ParallelThread(ParallelWork *work):
Thread(), work(work), sem(0)
{
interrupted = false;
unit = -1;
}
void feedUnit(int unit)
{
this->unit = unit;
sem.release();
}
void interrupt()
{
interrupted = true;
sem.release();
}
protected:
virtual void run() override
{
while (unit >= 0 or not interrupted)
{
// Wait for a unit (or interrupt)
sem.acquire();
// Process the unit
if (unit >= 0)
{
work->worker->processParallelUnit(unit);
unit = -1;
2014-08-18 14:25:44 +00:00
work->returnThread(this);
}
}
}
private:
ParallelWork* work;
Semaphore sem;
int unit;
bool interrupted;
};
2014-08-18 10:17:16 +00:00
ParallelWork::ParallelWork(ParallelWorker *worker, int units)
{
this->units = units;
this->running = 0;
this->worker = worker;
this->worker_compat = false;
}
ParallelWork::ParallelWork(ParallelUnitFunction func, int units, void* data)
{
this->units = units;
this->running = 0;
2014-08-18 10:17:16 +00:00
this->worker = new ParallelWorkerCompat(this, func, data);
this->worker_compat = true;
}
ParallelWork::~ParallelWork()
{
assert(not running);
2014-08-18 10:17:16 +00:00
if (worker_compat)
{
delete worker;
}
}
2014-08-18 10:17:16 +00:00
int ParallelWork::perform(int thread_count)
{
int i, done;
assert(not running);
// Get thread count
2014-08-18 10:17:16 +00:00
if (thread_count <= 0)
{
2014-08-18 10:17:16 +00:00
thread_count = System::getCoreCount();
}
2014-08-18 10:17:16 +00:00
if (thread_count > PARALLEL_MAX_THREADS)
{
2014-08-18 10:17:16 +00:00
thread_count = PARALLEL_MAX_THREADS;
}
this->thread_count = thread_count;
running = 1;
// Init threads
semaphore = new Semaphore(thread_count);
mutex = new Mutex();
threads = new ParallelThread*[thread_count];
available = new ParallelThread*[thread_count];
available_offset = 0;
available_length = thread_count;
2014-08-18 10:17:16 +00:00
for (i = 0; i < thread_count; i++)
{
threads[i] = available[i] = new ParallelThread(this);
threads[i]->start();
}
// Perform all units
for (done = 0; done < units; done++)
{
// Take first available thread
semaphore->acquire();
mutex->acquire();
ParallelThread *thread = available[available_offset];
available_offset++;
if (available_offset >= thread_count)
{
available_offset = 0;
}
available_length--;
mutex->release();
// Feed the unit to it
thread->feedUnit(done);
}
// Wait for all threads to end, then cleanup
2014-08-18 10:17:16 +00:00
for (i = 0; i < thread_count; i++)
{
threads[i]->interrupt();
}
for (i = 0; i < thread_count; i++)
{
threads[i]->join();
delete threads[i];
}
delete[] threads;
delete[] available;
delete semaphore;
delete mutex;
running = 0;
return done;
}
2014-08-19 07:18:55 +00:00
void ParallelWork::interrupt()
{
worker->interrupt();
}
void ParallelWork::returnThread(ParallelWork::ParallelThread *thread)
{
mutex->acquire();
available[(available_offset + available_length) % thread_count] = thread;
available_length++;
semaphore->release();
mutex->release();
}