diff --git a/server/server.cpp b/server/server.cpp index dbb99f1..c4a2f60 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -38,6 +38,7 @@ Server::Server(): router(std::make_shared()), pool(DB::Pool::create()), taskManager(std::make_shared()), + scheduler(std::make_shared(taskManager)), sessions() { std::cout << "Startig pica..." << std::endl; @@ -65,6 +66,28 @@ void Server::run(int socketDescriptor) { router->addRoute(std::make_unique(shared_from_this())); taskManager->start(); + scheduler->start(); + + scheduler->schedule([]() { + std::cout << "5000" << std::endl; + }, TM::Scheduler::Delay(5000)); + scheduler->schedule([]() { + std::cout << "2000" << std::endl; + }, TM::Scheduler::Delay(2000)); + scheduler->schedule([]() { + std::cout << "6000" << std::endl; + }, TM::Scheduler::Delay(6000)); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + scheduler->schedule([]() { + std::cout << "2000 + 500" << std::endl; + }, TM::Scheduler::Delay(2000)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + scheduler->schedule([]() { + std::cout << "1000 + 600" << std::endl; + }, TM::Scheduler::Delay(1000)); while (!terminating) { std::unique_ptr request = std::make_unique(); diff --git a/server/server.h b/server/server.h index 72f7fe6..4402cfb 100644 --- a/server/server.h +++ b/server/server.h @@ -26,6 +26,7 @@ #include "utils/helpers.h" #include "config.h" #include "taskmanager/manager.h" +#include "taskmanager/scheduler.h" class Server : public std::enable_shared_from_this { public: @@ -52,5 +53,6 @@ private: std::shared_ptr router; std::shared_ptr pool; std::shared_ptr taskManager; + std::shared_ptr scheduler; Sessions sessions; }; diff --git a/taskmanager/CMakeLists.txt b/taskmanager/CMakeLists.txt index 2149fd4..7a8c2c3 100644 --- a/taskmanager/CMakeLists.txt +++ b/taskmanager/CMakeLists.txt @@ -5,12 +5,14 @@ set(HEADERS manager.h job.h route.h + scheduler.h ) set(SOURCES manager.cpp job.cpp route.cpp + scheduler.cpp ) target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) diff --git a/taskmanager/scheduler.cpp b/taskmanager/scheduler.cpp new file mode 100644 index 0000000..45a8dc0 --- /dev/null +++ b/taskmanager/scheduler.cpp @@ -0,0 +1,75 @@ +//SPDX-FileCopyrightText: 2023 Yury Gubich +//SPDX-License-Identifier: GPL-3.0-or-later + +#include "scheduler.h" + +TM::Scheduler::Scheduler (std::weak_ptr manager): + manager(manager), + queue(), + mutex(), + cond(), + thread(nullptr), + running(false) +{} + +TM::Scheduler::~Scheduler () { + stop(); +} + +void TM::Scheduler::start () { + std::unique_lock lock(mutex); + if (running) + return; + + running = true; + thread = std::make_unique(&Scheduler::loop, this); +} + +void TM::Scheduler::stop () { + std::unique_lock lock(mutex); + if (!running) + return; + + running = false; + + lock.unlock(); + cond.notify_all(); + thread->join(); + + lock.lock(); + thread.reset(); +} + +void TM::Scheduler::loop () { + while (running) { + std::unique_lock lock(mutex); + if (queue.empty()) { + cond.wait(lock); + continue; + } + + Time currentTime = std::chrono::steady_clock::now(); + while (!queue.empty()) { + Time nextScheduledTime = queue.top().first; + if (nextScheduledTime > currentTime) { + cond.wait_until(lock, nextScheduledTime); + break; + } + + Task task = queue.top().second; + queue.pop(); + lock.unlock(); + task(); + lock.lock(); + } + } +} + +void TM::Scheduler::schedule (Task task, Delay delay) { + std::unique_lock lock(mutex); + Time time = std::chrono::steady_clock::now() + delay; + queue.emplace(time, task); + + lock.unlock(); + cond.notify_one(); +} diff --git a/taskmanager/scheduler.h b/taskmanager/scheduler.h new file mode 100644 index 0000000..40a0093 --- /dev/null +++ b/taskmanager/scheduler.h @@ -0,0 +1,48 @@ +//SPDX-FileCopyrightText: 2023 Yury Gubich +//SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "manager.h" +#include "utils/helpers.h" + +namespace TM { +class Scheduler { +public: + using Delay = std::chrono::milliseconds; + using Task = std::function; + + Scheduler (std::weak_ptr manager); + ~Scheduler (); + + void start(); + void stop(); + void schedule(Task task, Delay delay); + +private: + void loop(); + +private: + using Time = std::chrono::time_point; + using Record = std::pair; + + std::weak_ptr manager; + std::priority_queue< + Record, + std::vector, + FirstGreater + > queue; + std::mutex mutex; + std::condition_variable cond; + std::unique_ptr thread; + bool running; +}; +} diff --git a/utils/helpers.h b/utils/helpers.h index 2c00151..7991346 100644 --- a/utils/helpers.h +++ b/utils/helpers.h @@ -15,3 +15,10 @@ void ltrim(std::string& string); void rtrim(std::string& string); void trim(std::string& string); std::string extract(std::string& string, std::string::size_type begin, std::string::size_type end); + +template +struct FirstGreater { + bool operator () (T left, T right) { + return std::get<0>(left) > std::get<0>(right); + } +};