From 0566f2bdd8e4639718ac04ebf41df92913d5c94e Mon Sep 17 00:00:00 2001 From: Michael Lemaire Date: Mon, 18 Aug 2014 15:20:04 +0200 Subject: [PATCH] Refactored ParallelWork for better performance --- src/interface/desktop/WidgetPreviewCanvas.cpp | 3 +- .../software/SoftwareCanvasRenderer.cpp | 4 +- src/system/ParallelWork.cpp | 163 +++++++++++------- src/system/ParallelWork.h | 33 ++-- src/system/Semaphore.cpp | 6 + src/system/Semaphore.h | 25 +++ src/system/Thread.h | 2 +- src/system/system.pro | 6 +- src/system/system_global.h | 1 + 9 files changed, 156 insertions(+), 87 deletions(-) create mode 100644 src/system/Semaphore.cpp create mode 100644 src/system/Semaphore.h diff --git a/src/interface/desktop/WidgetPreviewCanvas.cpp b/src/interface/desktop/WidgetPreviewCanvas.cpp index 47d989a..e1463f9 100644 --- a/src/interface/desktop/WidgetPreviewCanvas.cpp +++ b/src/interface/desktop/WidgetPreviewCanvas.cpp @@ -12,7 +12,7 @@ WidgetPreviewCanvas::WidgetPreviewCanvas(QWidget *parent) : pixbuf = new QImage(); inited = false; - startTimer(1000); + startTimer(500); } WidgetPreviewCanvas::~WidgetPreviewCanvas() @@ -65,5 +65,6 @@ void WidgetPreviewCanvas::timerEvent(QTimerEvent *) } canvas->getPreview()->updateLive(this); + update(); } } diff --git a/src/render/software/SoftwareCanvasRenderer.cpp b/src/render/software/SoftwareCanvasRenderer.cpp index 4007e32..4106ffd 100644 --- a/src/render/software/SoftwareCanvasRenderer.cpp +++ b/src/render/software/SoftwareCanvasRenderer.cpp @@ -78,8 +78,8 @@ void SoftwareCanvasRenderer::postProcess(CanvasPortion *portion, bool threaded) { // Subdivide in chunks int chunk_size = 32; - int chunks_x = portion->getWidth() / chunk_size + 1; - int chunks_y = portion->getHeight() / chunk_size + 1; + int chunks_x = (portion->getWidth() - 1) / chunk_size + 1; + int chunks_y = (portion->getHeight() - 1) / chunk_size + 1; int units = chunks_x * chunks_y; // Render chunks in parallel diff --git a/src/system/ParallelWork.cpp b/src/system/ParallelWork.cpp index 9fb57ef..a80bb0a 100644 --- a/src/system/ParallelWork.cpp +++ b/src/system/ParallelWork.cpp @@ -2,6 +2,8 @@ #include "Thread.h" #include "System.h" +#include "Semaphore.h" +#include "Mutex.h" #include "ParallelWorker.h" #include @@ -27,6 +29,56 @@ private: void* data; }; +/** + * Thread sub-class to perform units of work + */ +class ParallelWork::ParallelThread:public Thread +{ +public: + ParallelThread(ParallelWork *work): + Thread(), work(work), sem(0) + { + interrupted = false; + unit = -1; + } + + void feedUnit(int unit) + { + this->unit = unit; + sem.release(); + } + + void interrupt() + { + interrupted = true; + sem.release(); + } + +protected: + virtual void run() override + { + while (unit >= 0 or not interrupted) + { + // Wait for a unit (or interrupt) + sem.acquire(); + + // Process the unit + if (unit >= 0) + { + work->worker->processParallelUnit(unit); + work->returnThread(this); + unit = -1; + } + } + } + +private: + ParallelWork* work; + Semaphore sem; + int unit; + bool interrupted; +}; + ParallelWork::ParallelWork(ParallelWorker *worker, int units) { this->units = units; @@ -52,58 +104,12 @@ ParallelWork::~ParallelWork() } } -static void* _workerThreadCallback(ParallelWork::ParallelWorkerThread* thread) -{ - thread->result = thread->worker->processParallelUnit(thread->unit); - thread->status = ParallelWork::PARALLEL_WORKER_STATUS_DONE; - return NULL; -} - -static int _runNextWorker(ParallelWork::ParallelWorkerThread threads[], int thread_count, int unit) -{ - int i; - - while (1) - { - for (i = 0; i < thread_count; i++) - { - ParallelWork::ParallelWorkerThread* worker = threads + i; - if (worker->status == ParallelWork::PARALLEL_WORKER_STATUS_VOID) - { - worker->status = ParallelWork::PARALLEL_WORKER_STATUS_RUNNING; - worker->result = 0; - worker->unit = unit; - worker->thread = new Thread((ThreadFunction)_workerThreadCallback); - worker->thread->start(worker); - - return 0; - } - else if (worker->status == ParallelWork::PARALLEL_WORKER_STATUS_DONE) - { - int result = worker->result; - - worker->status = ParallelWork::PARALLEL_WORKER_STATUS_RUNNING; - worker->result = 0; - worker->unit = unit; - worker->thread->join(); - delete worker->thread; - worker->thread = new Thread((ThreadFunction)_workerThreadCallback); - worker->thread->start(worker); - - return result; - } - } - Thread::timeSleepMs(50); - } -} - int ParallelWork::perform(int thread_count) { - int i, done, result; + int i, done; assert(not running); - result = 0; - + // Get thread count if (thread_count <= 0) { thread_count = System::getCoreCount(); @@ -112,38 +118,65 @@ int ParallelWork::perform(int thread_count) { thread_count = PARALLEL_MAX_THREADS; } + this->thread_count = thread_count; running = 1; - /* Init workers */ + // Init threads + semaphore = new Semaphore(thread_count); + mutex = new Mutex(); + threads = new ParallelThread*[thread_count]; + available = new ParallelThread*[thread_count]; + available_offset = 0; + available_length = thread_count; for (i = 0; i < thread_count; i++) { - threads[i].status = PARALLEL_WORKER_STATUS_VOID; - threads[i].worker = worker; + threads[i] = available[i] = new ParallelThread(this); + threads[i]->start(); } - /* Perform run */ + // Perform all units for (done = 0; done < units; done++) { - if (_runNextWorker(threads, thread_count, done)) + // Take first available thread + semaphore->acquire(); + mutex->acquire(); + ParallelThread *thread = available[available_offset]; + available_offset++; + if (available_offset >= thread_count) { - result++; + available_offset = 0; } + available_length--; + mutex->release(); + + // Feed the unit to it + thread->feedUnit(done); } - /* Wait and clean up workers */ + // Wait for all threads to end, then cleanup for (i = 0; i < thread_count; i++) { - if (threads[i].status != PARALLEL_WORKER_STATUS_VOID) - { - threads[i].thread->join(); - delete threads[i].thread; - if (threads[i].result) - { - result++; - } - } + threads[i]->interrupt(); } + for (i = 0; i < thread_count; i++) + { + threads[i]->join(); + delete threads[i]; + } + delete[] threads; + delete[] available; + delete semaphore; + delete mutex; running = 0; - return result; + return done; +} + +void ParallelWork::returnThread(ParallelWork::ParallelThread *thread) +{ + mutex->acquire(); + available[(available_offset + available_length) % thread_count] = thread; + available_length++; + semaphore->release(); + mutex->release(); } diff --git a/src/system/ParallelWork.h b/src/system/ParallelWork.h index 9f4b756..3e6d923 100644 --- a/src/system/ParallelWork.h +++ b/src/system/ParallelWork.h @@ -13,21 +13,11 @@ class SYSTEMSHARED_EXPORT ParallelWork public: typedef int (*ParallelUnitFunction)(ParallelWork* work, int unit, void* data); - typedef enum - { - PARALLEL_WORKER_STATUS_VOID, - PARALLEL_WORKER_STATUS_RUNNING, - PARALLEL_WORKER_STATUS_DONE - } ParallelWorkerStatus; - - typedef struct - { - Thread* thread; - ParallelWorker* worker; - ParallelWorkerStatus status; - int unit; - int result; - } ParallelWorkerThread; + /** + * Obscure thread class. + */ + class ParallelThread; + friend class ParallelThread; public: /** @@ -58,11 +48,22 @@ public: */ int perform(int thread_count=-1); +private: + void returnThread(ParallelThread *thread); + +private: int units; int running; ParallelWorker *worker; bool worker_compat; - ParallelWorkerThread threads[PARALLEL_MAX_THREADS]; + + int thread_count; + Mutex* mutex; + Semaphore* semaphore; + ParallelThread** threads; + ParallelThread** available; + int available_offset; + int available_length; }; } diff --git a/src/system/Semaphore.cpp b/src/system/Semaphore.cpp new file mode 100644 index 0000000..65ca0c7 --- /dev/null +++ b/src/system/Semaphore.cpp @@ -0,0 +1,6 @@ +#include "Semaphore.h" + +Semaphore::Semaphore(int resources): + QSemaphore(resources) +{ +} diff --git a/src/system/Semaphore.h b/src/system/Semaphore.h new file mode 100644 index 0000000..659cee2 --- /dev/null +++ b/src/system/Semaphore.h @@ -0,0 +1,25 @@ +#ifndef SEMAPHORE_H +#define SEMAPHORE_H + +#include "system_global.h" + +#include + +namespace paysages +{ +namespace system +{ + +class Semaphore: private QSemaphore +{ +public: + Semaphore(int resources); + + inline void acquire() {QSemaphore::acquire();} + inline void release() {QSemaphore::release();} +}; + +} +} + +#endif // SEMAPHORE_H diff --git a/src/system/Thread.h b/src/system/Thread.h index 699e381..5c49233 100644 --- a/src/system/Thread.h +++ b/src/system/Thread.h @@ -29,7 +29,7 @@ public: * \brief Start the thread * \param data User data to pass to the threaded function */ - void start(void* data); + void start(void* data=0); /*! * \brief Wait for the thread to end, and collect its result. diff --git a/src/system/system.pro b/src/system/system.pro index 73a8234..2e2f3c1 100644 --- a/src/system/system.pro +++ b/src/system/system.pro @@ -26,7 +26,8 @@ SOURCES += \ PictureWriter.cpp \ Logs.cpp \ ParallelPool.cpp \ - ParallelWorker.cpp + ParallelWorker.cpp \ + Semaphore.cpp HEADERS += \ system_global.h \ @@ -42,7 +43,8 @@ HEADERS += \ PictureWriter.h \ Logs.h \ ParallelPool.h \ - ParallelWorker.h + ParallelWorker.h \ + Semaphore.h unix:!symbian { maemo5 { diff --git a/src/system/system_global.h b/src/system/system_global.h index c490cdf..3b003bc 100644 --- a/src/system/system_global.h +++ b/src/system/system_global.h @@ -21,6 +21,7 @@ namespace system { class ParallelWorker; class Thread; class Mutex; + class Semaphore; } } using namespace paysages::system;