#include "taskmanager.h" TaskManager::TaskManager(const std::shared_ptr& logger) : Loggable(logger), running(false), stopping(false), maxThreads(std::thread::hardware_concurrency()), activeThreads(0), tasks(), mutex(), loopConditional(), waitConditional(), threads() { threads.reserve(maxThreads); } TaskManager::~TaskManager() { stop(); } void TaskManager::queue(const Job& job) { std::unique_lock lock(mutex); tasks.emplace(job, std::nullopt); lock.unlock(); loopConditional.notify_one(); } void TaskManager::queue(const Job& job, const Result& result) { std::unique_lock lock(mutex); tasks.emplace(job, result); lock.unlock(); loopConditional.notify_one(); } void TaskManager::start() { std::lock_guard lock(mutex); if (running) return; debug("Starting " + std::to_string(maxThreads) + " threads"); for (uint32_t i = 0; i < maxThreads; ++i) threads.emplace_back(&TaskManager::loop, this); running = true; } void TaskManager::stop() { std::unique_lock lock(mutex); if (!running) return; debug("Stopping task manager"); stopping = true; lock.unlock(); loopConditional.notify_all(); for (std::thread& thread : threads) thread.join(); threads.clear(); running = false; } void TaskManager::loop() { //debug("Thread " + std::to_string(std::this_thread::get_id()) + " entered the loop"); debug("Thread entered the loop"); while (true) { Task task; std::unique_lock lock(mutex); while (!stopping && tasks.empty()) loopConditional.wait(lock); if (stopping) break; ++activeThreads; task = tasks.front(); tasks.pop(); //debug("Thread took a task"); lock.unlock(); executeTask(task); lock.lock(); --activeThreads; lock.unlock(); waitConditional.notify_all(); } debug("Thread left the loop"); } void TaskManager::executeTask(const Task& task) const { try { task.first(); } catch (const std::exception& e) { if (task.second.has_value()) task.second.value()(&e); return; } try { if (task.second.has_value()) task.second.value()(std::nullopt); } catch (...) {} } bool TaskManager::busy() const { std::lock_guard lock(mutex); return activeThreads == 0; } void TaskManager::wait() const { std::unique_lock lock(mutex); while (activeThreads != 0 || !tasks.empty()) waitConditional.wait(lock); }