beginning of the scheduler

This commit is contained in:
Blue 2023-12-31 14:10:04 -03:00
parent f1a2006b4b
commit 26114aad5f
Signed by: blue
GPG Key ID: 9B203B252A63EE38
6 changed files with 157 additions and 0 deletions

View File

@ -38,6 +38,7 @@ Server::Server():
router(std::make_shared<Router>()), router(std::make_shared<Router>()),
pool(DB::Pool::create()), pool(DB::Pool::create()),
taskManager(std::make_shared<TM::Manager>()), taskManager(std::make_shared<TM::Manager>()),
scheduler(std::make_shared<TM::Scheduler>(taskManager)),
sessions() sessions()
{ {
std::cout << "Startig pica..." << std::endl; std::cout << "Startig pica..." << std::endl;
@ -65,6 +66,28 @@ void Server::run(int socketDescriptor) {
router->addRoute(std::make_unique<Handler::Login>(shared_from_this())); router->addRoute(std::make_unique<Handler::Login>(shared_from_this()));
taskManager->start(); taskManager->start();
scheduler->start();
scheduler->schedule([]() {
std::cout << "5000" << std::endl;
}, TM::Scheduler::Delay(5000));
scheduler->schedule([]() {
std::cout << "2000" << std::endl;
}, TM::Scheduler::Delay(2000));
scheduler->schedule([]() {
std::cout << "6000" << std::endl;
}, TM::Scheduler::Delay(6000));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
scheduler->schedule([]() {
std::cout << "2000 + 500" << std::endl;
}, TM::Scheduler::Delay(2000));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
scheduler->schedule([]() {
std::cout << "1000 + 600" << std::endl;
}, TM::Scheduler::Delay(1000));
while (!terminating) { while (!terminating) {
std::unique_ptr<Request> request = std::make_unique<Request>(); std::unique_ptr<Request> request = std::make_unique<Request>();

View File

@ -26,6 +26,7 @@
#include "utils/helpers.h" #include "utils/helpers.h"
#include "config.h" #include "config.h"
#include "taskmanager/manager.h" #include "taskmanager/manager.h"
#include "taskmanager/scheduler.h"
class Server : public std::enable_shared_from_this<Server> { class Server : public std::enable_shared_from_this<Server> {
public: public:
@ -52,5 +53,6 @@ private:
std::shared_ptr<Router> router; std::shared_ptr<Router> router;
std::shared_ptr<DB::Pool> pool; std::shared_ptr<DB::Pool> pool;
std::shared_ptr<TM::Manager> taskManager; std::shared_ptr<TM::Manager> taskManager;
std::shared_ptr<TM::Scheduler> scheduler;
Sessions sessions; Sessions sessions;
}; };

View File

@ -5,12 +5,14 @@ set(HEADERS
manager.h manager.h
job.h job.h
route.h route.h
scheduler.h
) )
set(SOURCES set(SOURCES
manager.cpp manager.cpp
job.cpp job.cpp
route.cpp route.cpp
scheduler.cpp
) )
target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) target_sources(${PROJECT_NAME} PRIVATE ${SOURCES})

75
taskmanager/scheduler.cpp Normal file
View File

@ -0,0 +1,75 @@
//SPDX-FileCopyrightText: 2023 Yury Gubich <blue@macaw.me>
//SPDX-License-Identifier: GPL-3.0-or-later
#include "scheduler.h"
TM::Scheduler::Scheduler (std::weak_ptr<Manager> manager):
manager(manager),
queue(),
mutex(),
cond(),
thread(nullptr),
running(false)
{}
TM::Scheduler::~Scheduler () {
stop();
}
void TM::Scheduler::start () {
std::unique_lock lock(mutex);
if (running)
return;
running = true;
thread = std::make_unique<std::thread>(&Scheduler::loop, this);
}
void TM::Scheduler::stop () {
std::unique_lock lock(mutex);
if (!running)
return;
running = false;
lock.unlock();
cond.notify_all();
thread->join();
lock.lock();
thread.reset();
}
void TM::Scheduler::loop () {
while (running) {
std::unique_lock<std::mutex> lock(mutex);
if (queue.empty()) {
cond.wait(lock);
continue;
}
Time currentTime = std::chrono::steady_clock::now();
while (!queue.empty()) {
Time nextScheduledTime = queue.top().first;
if (nextScheduledTime > currentTime) {
cond.wait_until(lock, nextScheduledTime);
break;
}
Task task = queue.top().second;
queue.pop();
lock.unlock();
task();
lock.lock();
}
}
}
void TM::Scheduler::schedule (Task task, Delay delay) {
std::unique_lock lock(mutex);
Time time = std::chrono::steady_clock::now() + delay;
queue.emplace(time, task);
lock.unlock();
cond.notify_one();
}

48
taskmanager/scheduler.h Normal file
View File

@ -0,0 +1,48 @@
//SPDX-FileCopyrightText: 2023 Yury Gubich <blue@macaw.me>
//SPDX-License-Identifier: GPL-3.0-or-later
#pragma once
#include <memory>
#include <thread>
#include <queue>
#include <chrono>
#include <functional>
#include <mutex>
#include <condition_variable>
#include "manager.h"
#include "utils/helpers.h"
namespace TM {
class Scheduler {
public:
using Delay = std::chrono::milliseconds;
using Task = std::function<void()>;
Scheduler (std::weak_ptr<Manager> manager);
~Scheduler ();
void start();
void stop();
void schedule(Task task, Delay delay);
private:
void loop();
private:
using Time = std::chrono::time_point<std::chrono::steady_clock>;
using Record = std::pair<Time, Task>;
std::weak_ptr<Manager> manager;
std::priority_queue<
Record,
std::vector<Record>,
FirstGreater<Record>
> queue;
std::mutex mutex;
std::condition_variable cond;
std::unique_ptr<std::thread> thread;
bool running;
};
}

View File

@ -15,3 +15,10 @@ void ltrim(std::string& string);
void rtrim(std::string& string); void rtrim(std::string& string);
void trim(std::string& string); void trim(std::string& string);
std::string extract(std::string& string, std::string::size_type begin, std::string::size_type end); std::string extract(std::string& string, std::string::size_type begin, std::string::size_type end);
template <class T>
struct FirstGreater {
bool operator () (T left, T right) {
return std::get<0>(left) > std::get<0>(right);
}
};