Refactored ParallelWork for better performance

This commit is contained in:
Michaël Lemaire 2014-08-18 15:20:04 +02:00
parent 2aeecdec62
commit 0566f2bdd8
9 changed files with 156 additions and 87 deletions

View file

@ -12,7 +12,7 @@ WidgetPreviewCanvas::WidgetPreviewCanvas(QWidget *parent) :
pixbuf = new QImage();
inited = false;
startTimer(1000);
startTimer(500);
}
WidgetPreviewCanvas::~WidgetPreviewCanvas()
@ -65,5 +65,6 @@ void WidgetPreviewCanvas::timerEvent(QTimerEvent *)
}
canvas->getPreview()->updateLive(this);
update();
}
}

View file

@ -78,8 +78,8 @@ void SoftwareCanvasRenderer::postProcess(CanvasPortion *portion, bool threaded)
{
// Subdivide in chunks
int chunk_size = 32;
int chunks_x = portion->getWidth() / chunk_size + 1;
int chunks_y = portion->getHeight() / chunk_size + 1;
int chunks_x = (portion->getWidth() - 1) / chunk_size + 1;
int chunks_y = (portion->getHeight() - 1) / chunk_size + 1;
int units = chunks_x * chunks_y;
// Render chunks in parallel

View file

@ -2,6 +2,8 @@
#include "Thread.h"
#include "System.h"
#include "Semaphore.h"
#include "Mutex.h"
#include "ParallelWorker.h"
#include <cassert>
@ -27,6 +29,56 @@ private:
void* data;
};
/**
* Thread sub-class to perform units of work
*/
class ParallelWork::ParallelThread:public Thread
{
public:
ParallelThread(ParallelWork *work):
Thread(), work(work), sem(0)
{
interrupted = false;
unit = -1;
}
void feedUnit(int unit)
{
this->unit = unit;
sem.release();
}
void interrupt()
{
interrupted = true;
sem.release();
}
protected:
virtual void run() override
{
while (unit >= 0 or not interrupted)
{
// Wait for a unit (or interrupt)
sem.acquire();
// Process the unit
if (unit >= 0)
{
work->worker->processParallelUnit(unit);
work->returnThread(this);
unit = -1;
}
}
}
private:
ParallelWork* work;
Semaphore sem;
int unit;
bool interrupted;
};
ParallelWork::ParallelWork(ParallelWorker *worker, int units)
{
this->units = units;
@ -52,58 +104,12 @@ ParallelWork::~ParallelWork()
}
}
static void* _workerThreadCallback(ParallelWork::ParallelWorkerThread* thread)
{
thread->result = thread->worker->processParallelUnit(thread->unit);
thread->status = ParallelWork::PARALLEL_WORKER_STATUS_DONE;
return NULL;
}
static int _runNextWorker(ParallelWork::ParallelWorkerThread threads[], int thread_count, int unit)
{
int i;
while (1)
{
for (i = 0; i < thread_count; i++)
{
ParallelWork::ParallelWorkerThread* worker = threads + i;
if (worker->status == ParallelWork::PARALLEL_WORKER_STATUS_VOID)
{
worker->status = ParallelWork::PARALLEL_WORKER_STATUS_RUNNING;
worker->result = 0;
worker->unit = unit;
worker->thread = new Thread((ThreadFunction)_workerThreadCallback);
worker->thread->start(worker);
return 0;
}
else if (worker->status == ParallelWork::PARALLEL_WORKER_STATUS_DONE)
{
int result = worker->result;
worker->status = ParallelWork::PARALLEL_WORKER_STATUS_RUNNING;
worker->result = 0;
worker->unit = unit;
worker->thread->join();
delete worker->thread;
worker->thread = new Thread((ThreadFunction)_workerThreadCallback);
worker->thread->start(worker);
return result;
}
}
Thread::timeSleepMs(50);
}
}
int ParallelWork::perform(int thread_count)
{
int i, done, result;
int i, done;
assert(not running);
result = 0;
// Get thread count
if (thread_count <= 0)
{
thread_count = System::getCoreCount();
@ -112,38 +118,65 @@ int ParallelWork::perform(int thread_count)
{
thread_count = PARALLEL_MAX_THREADS;
}
this->thread_count = thread_count;
running = 1;
/* Init workers */
// Init threads
semaphore = new Semaphore(thread_count);
mutex = new Mutex();
threads = new ParallelThread*[thread_count];
available = new ParallelThread*[thread_count];
available_offset = 0;
available_length = thread_count;
for (i = 0; i < thread_count; i++)
{
threads[i].status = PARALLEL_WORKER_STATUS_VOID;
threads[i].worker = worker;
threads[i] = available[i] = new ParallelThread(this);
threads[i]->start();
}
/* Perform run */
// Perform all units
for (done = 0; done < units; done++)
{
if (_runNextWorker(threads, thread_count, done))
// Take first available thread
semaphore->acquire();
mutex->acquire();
ParallelThread *thread = available[available_offset];
available_offset++;
if (available_offset >= thread_count)
{
result++;
available_offset = 0;
}
available_length--;
mutex->release();
// Feed the unit to it
thread->feedUnit(done);
}
/* Wait and clean up workers */
// Wait for all threads to end, then cleanup
for (i = 0; i < thread_count; i++)
{
if (threads[i].status != PARALLEL_WORKER_STATUS_VOID)
{
threads[i].thread->join();
delete threads[i].thread;
if (threads[i].result)
{
result++;
}
}
threads[i]->interrupt();
}
for (i = 0; i < thread_count; i++)
{
threads[i]->join();
delete threads[i];
}
delete[] threads;
delete[] available;
delete semaphore;
delete mutex;
running = 0;
return result;
return done;
}
void ParallelWork::returnThread(ParallelWork::ParallelThread *thread)
{
mutex->acquire();
available[(available_offset + available_length) % thread_count] = thread;
available_length++;
semaphore->release();
mutex->release();
}

View file

@ -13,21 +13,11 @@ class SYSTEMSHARED_EXPORT ParallelWork
public:
typedef int (*ParallelUnitFunction)(ParallelWork* work, int unit, void* data);
typedef enum
{
PARALLEL_WORKER_STATUS_VOID,
PARALLEL_WORKER_STATUS_RUNNING,
PARALLEL_WORKER_STATUS_DONE
} ParallelWorkerStatus;
typedef struct
{
Thread* thread;
ParallelWorker* worker;
ParallelWorkerStatus status;
int unit;
int result;
} ParallelWorkerThread;
/**
* Obscure thread class.
*/
class ParallelThread;
friend class ParallelThread;
public:
/**
@ -58,11 +48,22 @@ public:
*/
int perform(int thread_count=-1);
private:
void returnThread(ParallelThread *thread);
private:
int units;
int running;
ParallelWorker *worker;
bool worker_compat;
ParallelWorkerThread threads[PARALLEL_MAX_THREADS];
int thread_count;
Mutex* mutex;
Semaphore* semaphore;
ParallelThread** threads;
ParallelThread** available;
int available_offset;
int available_length;
};
}

6
src/system/Semaphore.cpp Normal file
View file

@ -0,0 +1,6 @@
#include "Semaphore.h"
Semaphore::Semaphore(int resources):
QSemaphore(resources)
{
}

25
src/system/Semaphore.h Normal file
View file

@ -0,0 +1,25 @@
#ifndef SEMAPHORE_H
#define SEMAPHORE_H
#include "system_global.h"
#include <QSemaphore>
namespace paysages
{
namespace system
{
class Semaphore: private QSemaphore
{
public:
Semaphore(int resources);
inline void acquire() {QSemaphore::acquire();}
inline void release() {QSemaphore::release();}
};
}
}
#endif // SEMAPHORE_H

View file

@ -29,7 +29,7 @@ public:
* \brief Start the thread
* \param data User data to pass to the threaded function
*/
void start(void* data);
void start(void* data=0);
/*!
* \brief Wait for the thread to end, and collect its result.

View file

@ -26,7 +26,8 @@ SOURCES += \
PictureWriter.cpp \
Logs.cpp \
ParallelPool.cpp \
ParallelWorker.cpp
ParallelWorker.cpp \
Semaphore.cpp
HEADERS += \
system_global.h \
@ -42,7 +43,8 @@ HEADERS += \
PictureWriter.h \
Logs.h \
ParallelPool.h \
ParallelWorker.h
ParallelWorker.h \
Semaphore.h
unix:!symbian {
maemo5 {

View file

@ -21,6 +21,7 @@ namespace system {
class ParallelWorker;
class Thread;
class Mutex;
class Semaphore;
}
}
using namespace paysages::system;