pica/server/session.cpp

219 lines
6.1 KiB
C++

//SPDX-FileCopyrightText: 2023 Yury Gubich <blue@macaw.me>
//SPDX-License-Identifier: GPL-3.0-or-later
#include "session.h"
#include "handler/poll.h"
Session::Session(
std::weak_ptr<TM::Scheduler> scheduler,
unsigned int id,
unsigned int owner,
const std::string& access,
const std::string& renew,
unsigned int timeout
):
id(id),
owner(owner),
scheduler(scheduler),
access(access),
renew(renew),
polling(nullptr),
timeoutId(TM::Scheduler::none),
timeout(timeout),
mtx(),
cache({
{"system", {
{"invalidate", true}
}}
})
{}
Session::~Session () {
if (timeoutId != TM::Scheduler::none) {
if (std::shared_ptr<TM::Scheduler> sch = scheduler.lock())
sch->cancel(timeoutId);
}
}
std::string Session::getAccessToken() const {
return access;
}
std::string Session::getRenewToken() const {
return renew;
}
void Session::accept(std::unique_ptr<Request> request) {
std::lock_guard lock(mtx);
std::shared_ptr<TM::Scheduler> sch = scheduler.lock();
if (polling) {
if (timeoutId != TM::Scheduler::none) {
if (sch)
sch->cancel(timeoutId);
timeoutId = TM::Scheduler::none;
}
Handler::Poll::error(*polling.get(), Handler::Poll::Result::replace, Response::Status::ok);
polling.reset();
}
std::map form = request->getForm();
auto clear = form.find("clearCache");
if (clear != form.end() && clear->second == "all")
cache.clear();
if (!cache.empty())
return sendUpdates(std::move(request));
if (!sch) {
std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl;
Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError);
return;
}
timeoutId = sch->schedule(std::bind(&Session::onTimeout, this), timeout);
polling = std::move(request);
}
void Session::sendUpdates (std::unique_ptr<Request> request) {
Response& res = request->createResponse(Response::Status::ok);
nlohmann::json body = nlohmann::json::object();
body["result"] = Handler::Poll::Result::success;
nlohmann::json& data = body["data"] = nlohmann::json::object();
for (const auto& category : cache) {
nlohmann::json& cat = data[category.first] = nlohmann::json::object();
for (const auto& entry : category.second)
cat[entry.first] = entry.second;
}
res.setBody(body);
res.send();
}
void Session::onTimeout () {
std::lock_guard lock(mtx);
timeoutId = TM::Scheduler::none;
Handler::Poll::error(*polling.get(), Handler::Poll::Result::timeout, Response::Status::ok);
polling.reset();
}
void Session::assetAdded (const DB::Asset& asset) {
std::lock_guard lock(mtx);
std::map<std::string, nlohmann::json>& assets = cache["assets"];
auto addedItr = assets.find("added");
if (addedItr == assets.end())
addedItr = assets.emplace("added", nlohmann::json::array()).first;
addedItr->second.push_back(asset.toJSON());
checkUpdates();
}
void Session::assetChanged (const DB::Asset& asset) {
std::lock_guard lock(mtx);
std::map<std::string, nlohmann::json>& assets = cache["assets"];
auto itr = assets.find("changed");
if (itr == assets.end())
itr = assets.emplace("changed", nlohmann::json::array()).first;
removeByID(itr->second, asset.id);
itr->second.push_back(asset.toJSON());
checkUpdates();
}
void Session::assetRemoved (unsigned int assetId) {
std::lock_guard lock(mtx);
std::map<std::string, nlohmann::json>& assets = cache["assets"];
auto itr = assets.find("added");
if (itr != assets.end())
removeByID(itr->second, assetId);
else {
itr = assets.find("removed");
if (itr == assets.end())
itr = assets.emplace("removed", nlohmann::json::array()).first;
itr->second.push_back(assetId);
}
itr = assets.find("changed");
if (itr != assets.end())
removeByID(itr->second, assetId);
checkUpdates();
}
void Session::transactionAdded(const DB::Transaction& txn) {
std::lock_guard lock(mtx);
std::map<std::string, nlohmann::json>& txns = cache["transactions"];
auto itr = txns.find("changed");
if (itr == txns.end())
itr = txns.emplace("changed", nlohmann::json::array()).first;
removeByID(itr->second, txn.id);
itr->second.push_back(txn.toJSON());
checkUpdates();
}
void Session::transactionChanged(const DB::Transaction& txn) {
std::lock_guard lock(mtx);
std::map<std::string, nlohmann::json>& txns = cache["transactions"];
auto itr = txns.find("changed");
if (itr == txns.end())
itr = txns.emplace("changed", nlohmann::json::array()).first;
removeByID(itr->second, txn.id);
itr->second.push_back(txn.toJSON());
checkUpdates();
}
void Session::transactionRemoved(unsigned int txnId) {
std::lock_guard lock(mtx);
std::map<std::string, nlohmann::json>& txns = cache["transactions"];
auto itr = txns.find("added");
if (itr != txns.end())
removeByID(itr->second, txnId);
else {
itr = txns.find("removed");
if (itr == txns.end())
itr = txns.emplace("removed", nlohmann::json::array()).first;
itr->second.push_back(txnId);
}
itr = txns.find("changed");
if (itr != txns.end())
removeByID(itr->second, txnId);
checkUpdates();
}
void Session::removeByID(nlohmann::json& array, unsigned int id) {
array.erase(
std::remove_if(
array.begin(),
array.end(),
[id](const nlohmann::json& item) {
return item["id"].get<unsigned int>() == id;
}
),
array.end()
);
}
void Session::checkUpdates () {
std::shared_ptr<TM::Scheduler> sch = scheduler.lock();
if (polling) {
if (timeoutId != TM::Scheduler::none) {
if (sch)
sch->cancel(timeoutId);
timeoutId = TM::Scheduler::none;
}
sendUpdates(std::move(polling));
}
}