From 2e01fe8d67a72940b80511f4367dca87de6b3961 Mon Sep 17 00:00:00 2001 From: blue Date: Sat, 13 Jan 2024 20:57:42 -0300 Subject: [PATCH] some ideas of updates delivery --- handler/addasset.cpp | 6 ++--- request/request.cpp | 31 +++++++++++++++++++++---- request/request.h | 2 ++ run.sh.in | 3 ++- server/session.cpp | 55 +++++++++++++++++++++++++++++++++++++++++++- server/session.h | 29 ++++++++++++++++------- 6 files changed, 108 insertions(+), 18 deletions(-) diff --git a/handler/addasset.cpp b/handler/addasset.cpp index fd63886..adc5e09 100644 --- a/handler/addasset.cpp +++ b/handler/addasset.cpp @@ -53,13 +53,11 @@ void Handler::AddAsset::handle (Request& request) { asset.owner = session.owner; asset = srv->getDatabase()->addAsset(asset); - nlohmann::json body = nlohmann::json::object(); - body["asset"] = asset.toJSON(); - Response& res = request.createResponse(Response::Status::ok); - res.setBody(body); res.send(); + session.assetAdded(asset); + } catch (const DB::NoSession& e) { return error(request, Response::Status::unauthorized); } catch (const std::exception& e) { diff --git a/request/request.cpp b/request/request.cpp index eedbc72..e6a055d 100644 --- a/request/request.cpp +++ b/request/request.cpp @@ -11,6 +11,7 @@ constexpr static const char* SERVER_NAME("SERVER_NAME"); constexpr static const char* CONTENT_TYPE("CONTENT_TYPE"); constexpr static const char* CONTENT_LENGTH("CONTENT_LENGTH"); constexpr static const char* AUTHORIZATION("HTTP_AUTHORIZATION"); +constexpr static const char* QUERY_STRING("QUERY_STRING"); constexpr static const char* urlEncoded("application/x-www-form-urlencoded"); @@ -33,7 +34,8 @@ Request::Request (): state(State::initial), raw(), response(nullptr), - path() + path(), + cachedMethod(Method::unknown) {} Request::~Request() { @@ -68,10 +70,15 @@ std::string_view Request::methodName() const { } Request::Method Request::method() const { + if (cachedMethod != Method::unknown) + return cachedMethod; + std::string_view method = methodName(); for (const auto& pair : methods) { - if (pair.first == method) + if (pair.first == method) { + cachedMethod = pair.second; return pair.second; + } } return Request::Method::unknown; @@ -227,20 +234,36 @@ unsigned int Request::contentLength() const { if (!active()) throw std::runtime_error("An attempt to read content length of a request in a wrong state"); - return atoi(FCGX_GetParam(CONTENT_LENGTH, raw.envp)); + char* cl = FCGX_GetParam(CONTENT_LENGTH, raw.envp); + if (cl != nullptr) + return atoi(cl); + else + return 0; } std::map Request::getForm() const { if (!active()) throw std::runtime_error("An attempt to read form of a request in a wrong state"); + switch (Request::method()) { + case Method::get: + return urlDecodeAndParse(FCGX_GetParam(QUERY_STRING, raw.envp)); + break; + case Method::post: + return getFormPOST(); + default: + return {}; + } +} + +std::map Request::getFormPOST () const { std::map result; std::string_view contentType(FCGX_GetParam(CONTENT_TYPE, raw.envp)); if (contentType.empty()) return result; - unsigned int length = contentLength(); if (contentType.find(urlEncoded) != std::string_view::npos) { + unsigned int length = contentLength(); std::string postData(length, '\0'); FCGX_GetStr(&postData[0], length, raw.in); result = urlDecodeAndParse(postData); diff --git a/request/request.h b/request/request.h index 1cef22c..55f2650 100644 --- a/request/request.h +++ b/request/request.h @@ -69,10 +69,12 @@ private: OStream getErrorStream(); void responseIsComplete(); void terminate(); + std::map getFormPOST() const; private: State state; FCGX_Request raw; std::unique_ptr response; std::string path; + mutable Method cachedMethod; }; diff --git a/run.sh.in b/run.sh.in index 7ee8ac3..be5f016 100644 --- a/run.sh.in +++ b/run.sh.in @@ -7,7 +7,7 @@ start_service() { if (systemctl is-active --quiet $1) then echo "$1 is already running" else - echo "$1 is not running, going to use \"sudo systemctl start $1\"" + echo "$1 is not running, going to use sudo to start it" if (sudo systemctl start $1) then echo "$1 started" else @@ -17,6 +17,7 @@ start_service() { } if [ ! -d "/run/pica" ]; then + echo "reuired unix socket was not found, going to use sudo to create it" sudo mkdir /run/pica sudo chown $USER:$USER /run/pica fi diff --git a/server/session.cpp b/server/session.cpp index a5bd4ce..ef13c2a 100644 --- a/server/session.cpp +++ b/server/session.cpp @@ -20,7 +20,9 @@ Session::Session( renew(renew), polling(nullptr), timeoutId(TM::Scheduler::none), - timeout(timeout) + timeout(timeout), + mtx(), + cache() {} Session::~Session () { @@ -39,6 +41,7 @@ std::string Session::getRenewToken() const { } void Session::accept(std::unique_ptr request) { + std::lock_guard lock(mtx); std::shared_ptr sch = scheduler.lock(); if (polling) { if (timeoutId != TM::Scheduler::none) { @@ -51,17 +54,67 @@ void Session::accept(std::unique_ptr request) { polling.reset(); } + std::map form = request->getForm(); + auto clear = form.find("clearCache"); + if (clear != form.end() && clear->second == "all") + cache.clear(); + + if (!cache.empty()) + return sendUpdates(std::move(request)); + if (!sch) { std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl; Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError); return; } + timeoutId = sch->schedule(std::bind(&Session::onTimeout, this), timeout); polling = std::move(request); } +void Session::sendUpdates (std::unique_ptr request) { + Response& res = request->createResponse(Response::Status::ok); + nlohmann::json body = nlohmann::json::object(); + body["result"] = Handler::Poll::Result::success; + nlohmann::json& data = body["data"] = nlohmann::json::object(); + + for (const auto& category : cache) { + nlohmann::json& cat = data[category.first] = nlohmann::json::object(); + for (const auto& entry : category.second) + cat[entry.first] = entry.second; + } + + res.setBody(body); + res.send(); +} + void Session::onTimeout () { + std::lock_guard lock(mtx); timeoutId = TM::Scheduler::none; Handler::Poll::error(*polling.get(), Handler::Poll::Result::timeout, Response::Status::ok); polling.reset(); } + +void Session::assetAdded (const DB::Asset& asset) { + std::lock_guard lock(mtx); + auto assets = cache["assets"]; + auto addedItr = assets.find("added"); + if (addedItr == assets.end()) + addedItr = assets.emplace("added", nlohmann::json::array()).first; + + addedItr->second.push_back(asset.toJSON()); + checkUpdates(); +} + +void Session::checkUpdates () { + std::shared_ptr sch = scheduler.lock(); + if (polling) { + if (timeoutId != TM::Scheduler::none) { + if (sch) + sch->cancel(timeoutId); + + timeoutId = TM::Scheduler::none; + } + sendUpdates(std::move(polling)); + } +} diff --git a/server/session.h b/server/session.h index 9ba2899..04bfc6e 100644 --- a/server/session.h +++ b/server/session.h @@ -4,13 +4,19 @@ #pragma once #include +#include +#include + +#include #include "request/accepting.h" #include "taskmanager/scheduler.h" +#include "database/schema/asset.h" + class Session : public Accepting { public: - Session( + Session ( std::weak_ptr scheduler, unsigned int id, unsigned int owner, @@ -18,21 +24,25 @@ public: const std::string& renew, unsigned int timeout ); - Session(const Session&) = delete; - Session(Session&& other) = delete; - ~Session(); + Session (const Session&) = delete; + Session (Session&& other) = delete; + ~Session (); Session& operator = (const Session&) = delete; Session& operator = (Session&& other) = delete; - std::string getAccessToken() const; - std::string getRenewToken() const; - void accept(std::unique_ptr request) override; + std::string getAccessToken () const; + std::string getRenewToken () const; + void accept (std::unique_ptr request) override; const unsigned int id; const unsigned int owner; + void assetAdded (const DB::Asset& asset); + private: - void onTimeout(); + void onTimeout (); + void sendUpdates (std::unique_ptr request); + void checkUpdates (); private: std::weak_ptr scheduler; @@ -41,4 +51,7 @@ private: std::unique_ptr polling; TM::Record::ID timeoutId; TM::Scheduler::Delay timeout; + std::mutex mtx; + + std::map> cache; };