From ec63dc655b0fce65ee63e18ce4703f3c6e0190df Mon Sep 17 00:00:00 2001 From: blue Date: Tue, 19 Sep 2023 21:25:38 -0300 Subject: [PATCH] basic taskManager (thread pool) --- src2/CMakeLists.txt | 2 ++ src2/taskmanager.cpp | 80 ++++++++++++++++++++++++++++++++++++++++++++ src2/taskmanager.h | 34 +++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 src2/taskmanager.cpp create mode 100644 src2/taskmanager.h diff --git a/src2/CMakeLists.txt b/src2/CMakeLists.txt index f28def2..b500e03 100644 --- a/src2/CMakeLists.txt +++ b/src2/CMakeLists.txt @@ -1,11 +1,13 @@ set(SOURCES collection.cpp loggable.cpp + taskmanager.cpp ) set(HEADERS collection.h loggable.h + taskmanager.h ) target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) diff --git a/src2/taskmanager.cpp b/src2/taskmanager.cpp new file mode 100644 index 0000000..03d9b3e --- /dev/null +++ b/src2/taskmanager.cpp @@ -0,0 +1,80 @@ +#include "taskmanager.h" + +TaskManager::TaskManager() : + running(false), + stopping(false), + maxThreads(std::thread::hardware_concurrency()), + activeThreads(0), + queue(), + queueMutex(), + loopConditional(), + waitConditional(), + threads() +{ + threads.reserve(maxThreads); +} + +TaskManager::~TaskManager() { + stop(); +} + +void TaskManager::start() { + std::lock_guard lock(queueMutex); + if (running) + return; + + for (uint32_t i = 0; i < maxThreads; ++i) + threads.emplace_back(&TaskManager::loop, this); + + running = true; +} + +void TaskManager::stop() { + std::unique_lock lock(queueMutex); + if (!running) + return; + + stopping = true; + lock.unlock(); + + loopConditional.notify_all(); + for (std::thread& thread : threads) + thread.join(); + + threads.clear(); +} + +void TaskManager::loop() { + while (true) { + Job job; + std::unique_lock lock(queueMutex); + while (!stopping && queue.empty()) + loopConditional.wait(lock); + + if (stopping) + return; + + ++activeThreads; + job = queue.front(); + queue.pop(); + lock.unlock(); + + //do the job + lock.lock(); + --activeThreads; + lock.unlock(); + waitConditional.notify_all(); + } + +} + +bool TaskManager::busy() { + std::lock_guard lock(queueMutex); + return activeThreads == 0; +} + +void TaskManager::wait() { + std::unique_lock lock(queueMutex); + while (activeThreads != 0) + waitConditional.wait(lock); +} diff --git a/src2/taskmanager.h b/src2/taskmanager.h new file mode 100644 index 0000000..87c8a6b --- /dev/null +++ b/src2/taskmanager.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +class TaskManager { +public: + typedef std::string Job; + TaskManager(); + ~TaskManager(); + + void start(); + void stop(); + void wait(); + bool busy(); + +private: + void loop(); + +private: + bool running; + bool stopping; + uint32_t maxThreads; + uint32_t activeThreads; + std::queue queue; + std::mutex queueMutex; + std::condition_variable loopConditional; + std::condition_variable waitConditional; + std::vector threads; +};