some ideas of updates delivery

This commit is contained in:
Blue 2024-01-13 20:57:42 -03:00
parent 4df8d4319e
commit 2e01fe8d67
Signed by: blue
GPG Key ID: 9B203B252A63EE38
6 changed files with 108 additions and 18 deletions

View File

@ -53,13 +53,11 @@ void Handler::AddAsset::handle (Request& request) {
asset.owner = session.owner; asset.owner = session.owner;
asset = srv->getDatabase()->addAsset(asset); asset = srv->getDatabase()->addAsset(asset);
nlohmann::json body = nlohmann::json::object();
body["asset"] = asset.toJSON();
Response& res = request.createResponse(Response::Status::ok); Response& res = request.createResponse(Response::Status::ok);
res.setBody(body);
res.send(); res.send();
session.assetAdded(asset);
} catch (const DB::NoSession& e) { } catch (const DB::NoSession& e) {
return error(request, Response::Status::unauthorized); return error(request, Response::Status::unauthorized);
} catch (const std::exception& e) { } catch (const std::exception& e) {

View File

@ -11,6 +11,7 @@ constexpr static const char* SERVER_NAME("SERVER_NAME");
constexpr static const char* CONTENT_TYPE("CONTENT_TYPE"); constexpr static const char* CONTENT_TYPE("CONTENT_TYPE");
constexpr static const char* CONTENT_LENGTH("CONTENT_LENGTH"); constexpr static const char* CONTENT_LENGTH("CONTENT_LENGTH");
constexpr static const char* AUTHORIZATION("HTTP_AUTHORIZATION"); constexpr static const char* AUTHORIZATION("HTTP_AUTHORIZATION");
constexpr static const char* QUERY_STRING("QUERY_STRING");
constexpr static const char* urlEncoded("application/x-www-form-urlencoded"); constexpr static const char* urlEncoded("application/x-www-form-urlencoded");
@ -33,7 +34,8 @@ Request::Request ():
state(State::initial), state(State::initial),
raw(), raw(),
response(nullptr), response(nullptr),
path() path(),
cachedMethod(Method::unknown)
{} {}
Request::~Request() { Request::~Request() {
@ -68,11 +70,16 @@ std::string_view Request::methodName() const {
} }
Request::Method Request::method() const { Request::Method Request::method() const {
if (cachedMethod != Method::unknown)
return cachedMethod;
std::string_view method = methodName(); std::string_view method = methodName();
for (const auto& pair : methods) { for (const auto& pair : methods) {
if (pair.first == method) if (pair.first == method) {
cachedMethod = pair.second;
return pair.second; return pair.second;
} }
}
return Request::Method::unknown; return Request::Method::unknown;
} }
@ -227,20 +234,36 @@ unsigned int Request::contentLength() const {
if (!active()) if (!active())
throw std::runtime_error("An attempt to read content length of a request in a wrong state"); throw std::runtime_error("An attempt to read content length of a request in a wrong state");
return atoi(FCGX_GetParam(CONTENT_LENGTH, raw.envp)); char* cl = FCGX_GetParam(CONTENT_LENGTH, raw.envp);
if (cl != nullptr)
return atoi(cl);
else
return 0;
} }
std::map<std::string, std::string> Request::getForm() const { std::map<std::string, std::string> Request::getForm() const {
if (!active()) if (!active())
throw std::runtime_error("An attempt to read form of a request in a wrong state"); throw std::runtime_error("An attempt to read form of a request in a wrong state");
switch (Request::method()) {
case Method::get:
return urlDecodeAndParse(FCGX_GetParam(QUERY_STRING, raw.envp));
break;
case Method::post:
return getFormPOST();
default:
return {};
}
}
std::map<std::string, std::string> Request::getFormPOST () const {
std::map<std::string, std::string> result; std::map<std::string, std::string> result;
std::string_view contentType(FCGX_GetParam(CONTENT_TYPE, raw.envp)); std::string_view contentType(FCGX_GetParam(CONTENT_TYPE, raw.envp));
if (contentType.empty()) if (contentType.empty())
return result; return result;
unsigned int length = contentLength();
if (contentType.find(urlEncoded) != std::string_view::npos) { if (contentType.find(urlEncoded) != std::string_view::npos) {
unsigned int length = contentLength();
std::string postData(length, '\0'); std::string postData(length, '\0');
FCGX_GetStr(&postData[0], length, raw.in); FCGX_GetStr(&postData[0], length, raw.in);
result = urlDecodeAndParse(postData); result = urlDecodeAndParse(postData);

View File

@ -69,10 +69,12 @@ private:
OStream getErrorStream(); OStream getErrorStream();
void responseIsComplete(); void responseIsComplete();
void terminate(); void terminate();
std::map<std::string, std::string> getFormPOST() const;
private: private:
State state; State state;
FCGX_Request raw; FCGX_Request raw;
std::unique_ptr<Response> response; std::unique_ptr<Response> response;
std::string path; std::string path;
mutable Method cachedMethod;
}; };

View File

@ -7,7 +7,7 @@ start_service() {
if (systemctl is-active --quiet $1) then if (systemctl is-active --quiet $1) then
echo "$1 is already running" echo "$1 is already running"
else else
echo "$1 is not running, going to use \"sudo systemctl start $1\"" echo "$1 is not running, going to use sudo to start it"
if (sudo systemctl start $1) then if (sudo systemctl start $1) then
echo "$1 started" echo "$1 started"
else else
@ -17,6 +17,7 @@ start_service() {
} }
if [ ! -d "/run/pica" ]; then if [ ! -d "/run/pica" ]; then
echo "reuired unix socket was not found, going to use sudo to create it"
sudo mkdir /run/pica sudo mkdir /run/pica
sudo chown $USER:$USER /run/pica sudo chown $USER:$USER /run/pica
fi fi

View File

@ -20,7 +20,9 @@ Session::Session(
renew(renew), renew(renew),
polling(nullptr), polling(nullptr),
timeoutId(TM::Scheduler::none), timeoutId(TM::Scheduler::none),
timeout(timeout) timeout(timeout),
mtx(),
cache()
{} {}
Session::~Session () { Session::~Session () {
@ -39,6 +41,7 @@ std::string Session::getRenewToken() const {
} }
void Session::accept(std::unique_ptr<Request> request) { void Session::accept(std::unique_ptr<Request> request) {
std::lock_guard lock(mtx);
std::shared_ptr<TM::Scheduler> sch = scheduler.lock(); std::shared_ptr<TM::Scheduler> sch = scheduler.lock();
if (polling) { if (polling) {
if (timeoutId != TM::Scheduler::none) { if (timeoutId != TM::Scheduler::none) {
@ -51,17 +54,67 @@ void Session::accept(std::unique_ptr<Request> request) {
polling.reset(); polling.reset();
} }
std::map form = request->getForm();
auto clear = form.find("clearCache");
if (clear != form.end() && clear->second == "all")
cache.clear();
if (!cache.empty())
return sendUpdates(std::move(request));
if (!sch) { if (!sch) {
std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl; std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl;
Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError); Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError);
return; return;
} }
timeoutId = sch->schedule(std::bind(&Session::onTimeout, this), timeout); timeoutId = sch->schedule(std::bind(&Session::onTimeout, this), timeout);
polling = std::move(request); polling = std::move(request);
} }
void Session::sendUpdates (std::unique_ptr<Request> request) {
Response& res = request->createResponse(Response::Status::ok);
nlohmann::json body = nlohmann::json::object();
body["result"] = Handler::Poll::Result::success;
nlohmann::json& data = body["data"] = nlohmann::json::object();
for (const auto& category : cache) {
nlohmann::json& cat = data[category.first] = nlohmann::json::object();
for (const auto& entry : category.second)
cat[entry.first] = entry.second;
}
res.setBody(body);
res.send();
}
void Session::onTimeout () { void Session::onTimeout () {
std::lock_guard lock(mtx);
timeoutId = TM::Scheduler::none; timeoutId = TM::Scheduler::none;
Handler::Poll::error(*polling.get(), Handler::Poll::Result::timeout, Response::Status::ok); Handler::Poll::error(*polling.get(), Handler::Poll::Result::timeout, Response::Status::ok);
polling.reset(); polling.reset();
} }
void Session::assetAdded (const DB::Asset& asset) {
std::lock_guard lock(mtx);
auto assets = cache["assets"];
auto addedItr = assets.find("added");
if (addedItr == assets.end())
addedItr = assets.emplace("added", nlohmann::json::array()).first;
addedItr->second.push_back(asset.toJSON());
checkUpdates();
}
void Session::checkUpdates () {
std::shared_ptr<TM::Scheduler> sch = scheduler.lock();
if (polling) {
if (timeoutId != TM::Scheduler::none) {
if (sch)
sch->cancel(timeoutId);
timeoutId = TM::Scheduler::none;
}
sendUpdates(std::move(polling));
}
}

View File

@ -4,10 +4,16 @@
#pragma once #pragma once
#include <string> #include <string>
#include <map>
#include <mutex>
#include <nlohmann/json.hpp>
#include "request/accepting.h" #include "request/accepting.h"
#include "taskmanager/scheduler.h" #include "taskmanager/scheduler.h"
#include "database/schema/asset.h"
class Session : public Accepting { class Session : public Accepting {
public: public:
Session ( Session (
@ -31,8 +37,12 @@ public:
const unsigned int id; const unsigned int id;
const unsigned int owner; const unsigned int owner;
void assetAdded (const DB::Asset& asset);
private: private:
void onTimeout (); void onTimeout ();
void sendUpdates (std::unique_ptr<Request> request);
void checkUpdates ();
private: private:
std::weak_ptr<TM::Scheduler> scheduler; std::weak_ptr<TM::Scheduler> scheduler;
@ -41,4 +51,7 @@ private:
std::unique_ptr<Request> polling; std::unique_ptr<Request> polling;
TM::Record::ID timeoutId; TM::Record::ID timeoutId;
TM::Scheduler::Delay timeout; TM::Scheduler::Delay timeout;
std::mutex mtx;
std::map<std::string, std::map<std::string, nlohmann::json>> cache;
}; };