//SPDX-FileCopyrightText: 2023 Yury Gubich //SPDX-License-Identifier: GPL-3.0-or-later #include "session.h" #include "handler/poll.h" Session::Session( std::weak_ptr scheduler, unsigned int id, unsigned int owner, const std::string& access, const std::string& renew, unsigned int timeout ): id(id), owner(owner), scheduler(scheduler), access(access), renew(renew), polling(nullptr), timeoutId(TM::Scheduler::none), timeout(timeout), mtx(), cache({ {"system", { {"invalidate", true} }} }) {} Session::~Session () { if (timeoutId != TM::Scheduler::none) { if (std::shared_ptr sch = scheduler.lock()) sch->cancel(timeoutId); } } std::string Session::getAccessToken() const { return access; } std::string Session::getRenewToken() const { return renew; } void Session::accept(std::unique_ptr request) { std::lock_guard lock(mtx); std::shared_ptr sch = scheduler.lock(); if (polling) { if (timeoutId != TM::Scheduler::none) { if (sch) sch->cancel(timeoutId); timeoutId = TM::Scheduler::none; } Handler::Poll::error(*polling.get(), Handler::Poll::Result::replace, Response::Status::ok); polling.reset(); } std::map form = request->getForm(); auto clear = form.find("clearCache"); if (clear != form.end() && clear->second == "all") cache.clear(); if (!cache.empty()) return sendUpdates(std::move(request)); if (!sch) { std::cerr << "Was unable to schedule polling timeout, replying with an error" << std::endl; Handler::Poll::error(*request.get(), Handler::Poll::Result::unknownError, Response::Status::internalError); return; } timeoutId = sch->schedule(std::bind(&Session::onTimeout, this), timeout); polling = std::move(request); } void Session::sendUpdates (std::unique_ptr request) { Response& res = request->createResponse(Response::Status::ok); nlohmann::json body = nlohmann::json::object(); body["result"] = Handler::Poll::Result::success; nlohmann::json& data = body["data"] = nlohmann::json::object(); for (const auto& category : cache) { nlohmann::json& cat = data[category.first] = nlohmann::json::object(); for (const auto& entry : category.second) cat[entry.first] = entry.second; } res.setBody(body); res.send(); } void Session::onTimeout () { std::lock_guard lock(mtx); timeoutId = TM::Scheduler::none; Handler::Poll::error(*polling.get(), Handler::Poll::Result::timeout, Response::Status::ok); polling.reset(); } void Session::assetAdded (const DB::Asset& asset) { std::lock_guard lock(mtx); std::map& assets = cache["assets"]; auto addedItr = assets.find("added"); if (addedItr == assets.end()) addedItr = assets.emplace("added", nlohmann::json::array()).first; addedItr->second.push_back(asset.toJSON()); checkUpdates(); } void Session::assetChanged (const DB::Asset& asset) { std::lock_guard lock(mtx); std::map& assets = cache["assets"]; auto itr = assets.find("changed"); if (itr == assets.end()) itr = assets.emplace("changed", nlohmann::json::array()).first; removeByID(itr->second, asset.id); itr->second.push_back(asset.toJSON()); checkUpdates(); } void Session::assetRemoved (unsigned int assetId) { std::lock_guard lock(mtx); std::map& assets = cache["assets"]; auto itr = assets.find("added"); if (itr != assets.end()) removeByID(itr->second, assetId); else { itr = assets.find("removed"); if (itr == assets.end()) itr = assets.emplace("removed", nlohmann::json::array()).first; itr->second.push_back(assetId); } itr = assets.find("changed"); if (itr != assets.end()) removeByID(itr->second, assetId); checkUpdates(); } void Session::removeByID(nlohmann::json& array, unsigned int id) { array.erase( std::remove_if( array.begin(), array.end(), [id](const nlohmann::json& item) { return item["id"].get() == id; } ), array.end() ); } void Session::checkUpdates () { std::shared_ptr sch = scheduler.lock(); if (polling) { if (timeoutId != TM::Scheduler::none) { if (sch) sch->cancel(timeoutId); timeoutId = TM::Scheduler::none; } sendUpdates(std::move(polling)); } }