some thoughts about scheduling

This commit is contained in:
Blue 2024-01-02 22:11:56 -03:00
parent 26114aad5f
commit 544db92b6e
Signed by: blue
GPG Key ID: 9B203B252A63EE38
11 changed files with 123 additions and 23 deletions

View File

@ -34,7 +34,7 @@ void Handler::Poll::handle (Request& request) {
} }
} }
void Handler::Poll::error(Request& request, Result result, Response::Status status) const { void Handler::Poll::error(Request& request, Result result, Response::Status status) {
Response& res = request.createResponse(status); Response& res = request.createResponse(status);
nlohmann::json body = nlohmann::json::object(); nlohmann::json body = nlohmann::json::object();
body["result"] = result; body["result"] = result;

View File

@ -23,7 +23,7 @@ public:
unknownError unknownError
}; };
void error(Request& request, Result result, Response::Status status) const; static void error(Request& request, Result result, Response::Status status);
private: private:
Server* server; Server* server;

View File

@ -5,7 +5,13 @@
#include "handler/poll.h" #include "handler/poll.h"
Session::Session(unsigned int id, const std::string& access, const std::string& renew): Session::Session(
std::weak_ptr<TM::Scheduler> scheduler,
unsigned int id,
const std::string& access,
const std::string& renew
):
scheduler(scheduler),
id(id), id(id),
access(access), access(access),
renew(renew), renew(renew),
@ -22,13 +28,17 @@ std::string Session::getRenewToken() const {
void Session::accept(std::unique_ptr<Request> request) { void Session::accept(std::unique_ptr<Request> request) {
if (polling) { if (polling) {
Response& res = request->createResponse(Response::Status::ok); Handler::Poll::error(*request.get(), Handler::Poll::Result::replace, Response::Status::ok);
nlohmann::json body = nlohmann::json::object(); //TODO unschedule
body["result"] = Handler::Poll::Result::replace;
res.setBody(body);
res.send();
} }
std::shared_ptr<TM::Scheduler> sch = scheduler.lock();
if (!sch) {
std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl;
Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError);
return;
}
sch->schedule(std::bind(&Session::onTimeout, this), TM::Scheduler::Delay(5000));
polling = std::move(request); polling = std::move(request);
} }

View File

@ -6,10 +6,16 @@
#include <string> #include <string>
#include "request/accepting.h" #include "request/accepting.h"
#include "taskmanager/scheduler.h"
class Session : public Accepting { class Session : public Accepting {
public: public:
Session(unsigned int id, const std::string& access, const std::string& renew); Session(
std::weak_ptr<TM::Scheduler> scheduler,
unsigned int id,
const std::string& access,
const std::string& renew
);
Session(const Session&) = delete; Session(const Session&) = delete;
Session(Session&& other); Session(Session&& other);
Session& operator = (const Session&) = delete; Session& operator = (const Session&) = delete;
@ -20,6 +26,10 @@ public:
void accept(std::unique_ptr<Request> request) override; void accept(std::unique_ptr<Request> request) override;
private: private:
void onTimeout();
private:
std::weak_ptr<TM::Scheduler> scheduler;
unsigned int id; unsigned int id;
std::string access; std::string access;
std::string renew; std::string renew;

View File

@ -6,6 +6,7 @@ set(HEADERS
job.h job.h
route.h route.h
scheduler.h scheduler.h
function.h
) )
set(SOURCES set(SOURCES
@ -13,6 +14,7 @@ set(SOURCES
job.cpp job.cpp
route.cpp route.cpp
scheduler.cpp scheduler.cpp
function.cpp
) )
target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) target_sources(${PROJECT_NAME} PRIVATE ${SOURCES})

12
taskmanager/function.cpp Normal file
View File

@ -0,0 +1,12 @@
//SPDX-FileCopyrightText: 2024 Yury Gubich <blue@macaw.me>
//SPDX-License-Identifier: GPL-3.0-or-later
#include "function.h"
TM::Function::Function (const std::function<void()>& fn):
fn(fn)
{}
void TM::Function::execute () {
fn();
}

20
taskmanager/function.h Normal file
View File

@ -0,0 +1,20 @@
//SPDX-FileCopyrightText: 2024 Yury Gubich <blue@macaw.me>
//SPDX-License-Identifier: GPL-3.0-or-later
#pragma once
#include "functional"
#include "job.h"
namespace TM {
class Function : public Job {
public:
Function(const std::function<void()>& fn);
void execute () override;
private:
std::function<void()> fn;
};
}

View File

@ -4,8 +4,8 @@
#include "scheduler.h" #include "scheduler.h"
TM::Scheduler::Scheduler (std::weak_ptr<Manager> manager): TM::Scheduler::Scheduler (std::weak_ptr<Manager> manager):
manager(manager),
queue(), queue(),
manager(manager),
mutex(), mutex(),
cond(), cond(),
thread(nullptr), thread(nullptr),
@ -56,19 +56,21 @@ void TM::Scheduler::loop () {
break; break;
} }
Task task = queue.top().second; Record task = queue.pop();
queue.pop();
lock.unlock(); lock.unlock();
task(); std::shared_ptr<Manager> mngr = manager.lock();
if (mngr)
mngr->schedule(std::move(task.second));
lock.lock(); lock.lock();
} }
} }
} }
void TM::Scheduler::schedule (Task task, Delay delay) { void TM::Scheduler::schedule (const std::function<void()>& task, Delay delay) {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
Time time = std::chrono::steady_clock::now() + delay; Time time = std::chrono::steady_clock::now() + delay;
queue.emplace(time, task); queue.emplace(time, std::make_unique<Function>(task));
lock.unlock(); lock.unlock();
cond.notify_one(); cond.notify_one();

View File

@ -5,41 +5,41 @@
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <queue>
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include "manager.h" #include "manager.h"
#include "function.h"
#include "utils/helpers.h" #include "utils/helpers.h"
namespace TM { namespace TM {
class Scheduler { class Scheduler {
public: public:
using Delay = std::chrono::milliseconds; using Delay = std::chrono::milliseconds;
using Task = std::function<void()>;
Scheduler (std::weak_ptr<Manager> manager); Scheduler (std::weak_ptr<Manager> manager);
~Scheduler (); ~Scheduler ();
void start(); void start();
void stop(); void stop();
void schedule(Task task, Delay delay); void schedule(const std::function<void()>& task, Delay delay);
private: private:
void loop(); void loop();
private: private:
using Task = std::unique_ptr<Function>;
using Time = std::chrono::time_point<std::chrono::steady_clock>; using Time = std::chrono::time_point<std::chrono::steady_clock>;
using Record = std::pair<Time, Task>; using Record = std::pair<Time, Task>;
std::weak_ptr<Manager> manager; PriorityQueue<
std::priority_queue<
Record, Record,
std::vector<Record>, std::vector<Record>,
FirstGreater<Record> FirstGreater<Record>
> queue; > queue;
std::weak_ptr<Manager> manager;
std::mutex mutex; std::mutex mutex;
std::condition_variable cond; std::condition_variable cond;
std::unique_ptr<std::thread> thread; std::unique_ptr<std::thread> thread;

View File

@ -4,7 +4,6 @@
#include "helpers.h" #include "helpers.h"
#include "iostream" #include "iostream"
#include <algorithm>
#include <functional> #include <functional>
#include "config.h" #include "config.h"

View File

@ -5,6 +5,7 @@
#include <filesystem> #include <filesystem>
#include <string> #include <string>
#include <algorithm>
#define UNUSED(variable) (void)variable #define UNUSED(variable) (void)variable
@ -18,7 +19,51 @@ std::string extract(std::string& string, std::string::size_type begin, std::stri
template <class T> template <class T>
struct FirstGreater { struct FirstGreater {
bool operator () (T left, T right) { bool operator () (const T& left, const T& right) {
return std::get<0>(left) > std::get<0>(right); return std::get<0>(left) > std::get<0>(right);
} }
}; };
template <class Type, class Container, class Compare = std::less<Type>>
class PriorityQueue {
public:
explicit PriorityQueue(const Compare& compare = Compare()):
container(),
compare(compare)
{}
const Type& top () const {
return container.front();
}
bool empty () const {
return container.empty();
}
template<class... Args>
void emplace (Args&&... args) {
container.emplace_back(std::forward<Args>(args)...);
std::push_heap(container.begin(), container.end(), compare);
}
void push (const Type& element) {
container.push_back(element);
std::push_heap(container.begin(), container.end(), compare);
}
void push (Type&& element) {
container.push_back(std::move(element));
std::push_heap(container.begin(), container.end(), compare);
}
Type pop () {
std::pop_heap(container.begin(), container.end(), compare);
Type result = std::move(container.back());
container.pop_back();
return result;
}
private:
Container container;
Compare compare;
};