diff --git a/handler/poll.cpp b/handler/poll.cpp index 5fb2c3b..c0e4d82 100644 --- a/handler/poll.cpp +++ b/handler/poll.cpp @@ -34,7 +34,7 @@ void Handler::Poll::handle (Request& request) { } } -void Handler::Poll::error(Request& request, Result result, Response::Status status) const { +void Handler::Poll::error(Request& request, Result result, Response::Status status) { Response& res = request.createResponse(status); nlohmann::json body = nlohmann::json::object(); body["result"] = result; diff --git a/handler/poll.h b/handler/poll.h index 656a159..580256e 100644 --- a/handler/poll.h +++ b/handler/poll.h @@ -23,7 +23,7 @@ public: unknownError }; - void error(Request& request, Result result, Response::Status status) const; + static void error(Request& request, Result result, Response::Status status); private: Server* server; diff --git a/server/session.cpp b/server/session.cpp index fb7d91b..efafd89 100644 --- a/server/session.cpp +++ b/server/session.cpp @@ -5,7 +5,13 @@ #include "handler/poll.h" -Session::Session(unsigned int id, const std::string& access, const std::string& renew): +Session::Session( + std::weak_ptr scheduler, + unsigned int id, + const std::string& access, + const std::string& renew +): + scheduler(scheduler), id(id), access(access), renew(renew), @@ -22,13 +28,17 @@ std::string Session::getRenewToken() const { void Session::accept(std::unique_ptr request) { if (polling) { - Response& res = request->createResponse(Response::Status::ok); - nlohmann::json body = nlohmann::json::object(); - body["result"] = Handler::Poll::Result::replace; - - res.setBody(body); - res.send(); + Handler::Poll::error(*request.get(), Handler::Poll::Result::replace, Response::Status::ok); + //TODO unschedule } + std::shared_ptr sch = scheduler.lock(); + if (!sch) { + std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl; + Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError); + return; + } + sch->schedule(std::bind(&Session::onTimeout, this), TM::Scheduler::Delay(5000)); + polling = std::move(request); } diff --git a/server/session.h b/server/session.h index 0027562..14c43e9 100644 --- a/server/session.h +++ b/server/session.h @@ -6,10 +6,16 @@ #include #include "request/accepting.h" +#include "taskmanager/scheduler.h" class Session : public Accepting { public: - Session(unsigned int id, const std::string& access, const std::string& renew); + Session( + std::weak_ptr scheduler, + unsigned int id, + const std::string& access, + const std::string& renew + ); Session(const Session&) = delete; Session(Session&& other); Session& operator = (const Session&) = delete; @@ -20,6 +26,10 @@ public: void accept(std::unique_ptr request) override; private: + void onTimeout(); + +private: + std::weak_ptr scheduler; unsigned int id; std::string access; std::string renew; diff --git a/taskmanager/CMakeLists.txt b/taskmanager/CMakeLists.txt index 7a8c2c3..f45b98f 100644 --- a/taskmanager/CMakeLists.txt +++ b/taskmanager/CMakeLists.txt @@ -6,6 +6,7 @@ set(HEADERS job.h route.h scheduler.h + function.h ) set(SOURCES @@ -13,6 +14,7 @@ set(SOURCES job.cpp route.cpp scheduler.cpp + function.cpp ) target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) diff --git a/taskmanager/function.cpp b/taskmanager/function.cpp new file mode 100644 index 0000000..bf6d01c --- /dev/null +++ b/taskmanager/function.cpp @@ -0,0 +1,12 @@ +//SPDX-FileCopyrightText: 2024 Yury Gubich +//SPDX-License-Identifier: GPL-3.0-or-later + +#include "function.h" + +TM::Function::Function (const std::function& fn): + fn(fn) +{} + +void TM::Function::execute () { + fn(); +} diff --git a/taskmanager/function.h b/taskmanager/function.h new file mode 100644 index 0000000..c6ca024 --- /dev/null +++ b/taskmanager/function.h @@ -0,0 +1,20 @@ +//SPDX-FileCopyrightText: 2024 Yury Gubich +//SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include "functional" + +#include "job.h" + +namespace TM { +class Function : public Job { +public: + Function(const std::function& fn); + + void execute () override; + +private: + std::function fn; +}; +} diff --git a/taskmanager/scheduler.cpp b/taskmanager/scheduler.cpp index 45a8dc0..db781e3 100644 --- a/taskmanager/scheduler.cpp +++ b/taskmanager/scheduler.cpp @@ -4,8 +4,8 @@ #include "scheduler.h" TM::Scheduler::Scheduler (std::weak_ptr manager): - manager(manager), queue(), + manager(manager), mutex(), cond(), thread(nullptr), @@ -56,19 +56,21 @@ void TM::Scheduler::loop () { break; } - Task task = queue.top().second; - queue.pop(); + Record task = queue.pop(); lock.unlock(); - task(); + std::shared_ptr mngr = manager.lock(); + if (mngr) + mngr->schedule(std::move(task.second)); + lock.lock(); } } } -void TM::Scheduler::schedule (Task task, Delay delay) { +void TM::Scheduler::schedule (const std::function& task, Delay delay) { std::unique_lock lock(mutex); Time time = std::chrono::steady_clock::now() + delay; - queue.emplace(time, task); + queue.emplace(time, std::make_unique(task)); lock.unlock(); cond.notify_one(); diff --git a/taskmanager/scheduler.h b/taskmanager/scheduler.h index 40a0093..a952858 100644 --- a/taskmanager/scheduler.h +++ b/taskmanager/scheduler.h @@ -5,41 +5,41 @@ #include #include -#include #include #include #include #include #include "manager.h" +#include "function.h" #include "utils/helpers.h" namespace TM { class Scheduler { public: using Delay = std::chrono::milliseconds; - using Task = std::function; Scheduler (std::weak_ptr manager); ~Scheduler (); void start(); void stop(); - void schedule(Task task, Delay delay); + void schedule(const std::function& task, Delay delay); private: void loop(); private: + using Task = std::unique_ptr; using Time = std::chrono::time_point; using Record = std::pair; - std::weak_ptr manager; - std::priority_queue< + PriorityQueue< Record, std::vector, FirstGreater > queue; + std::weak_ptr manager; std::mutex mutex; std::condition_variable cond; std::unique_ptr thread; diff --git a/utils/helpers.cpp b/utils/helpers.cpp index 5e2aed2..1af821e 100644 --- a/utils/helpers.cpp +++ b/utils/helpers.cpp @@ -4,7 +4,6 @@ #include "helpers.h" #include "iostream" -#include #include #include "config.h" diff --git a/utils/helpers.h b/utils/helpers.h index 7991346..91a597d 100644 --- a/utils/helpers.h +++ b/utils/helpers.h @@ -5,6 +5,7 @@ #include #include +#include #define UNUSED(variable) (void)variable @@ -18,7 +19,51 @@ std::string extract(std::string& string, std::string::size_type begin, std::stri template struct FirstGreater { - bool operator () (T left, T right) { + bool operator () (const T& left, const T& right) { return std::get<0>(left) > std::get<0>(right); } }; + +template > +class PriorityQueue { +public: + explicit PriorityQueue(const Compare& compare = Compare()): + container(), + compare(compare) + {} + + const Type& top () const { + return container.front(); + } + + bool empty () const { + return container.empty(); + } + + template + void emplace (Args&&... args) { + container.emplace_back(std::forward(args)...); + std::push_heap(container.begin(), container.end(), compare); + } + + void push (const Type& element) { + container.push_back(element); + std::push_heap(container.begin(), container.end(), compare); + } + + void push (Type&& element) { + container.push_back(std::move(element)); + std::push_heap(container.begin(), container.end(), compare); + } + + Type pop () { + std::pop_heap(container.begin(), container.end(), compare); + Type result = std::move(container.back()); + container.pop_back(); + return result; + } + +private: + Container container; + Compare compare; +};