diff --git a/component/CMakeLists.txt b/component/CMakeLists.txt index ba46907..5089020 100644 --- a/component/CMakeLists.txt +++ b/component/CMakeLists.txt @@ -3,6 +3,7 @@ set(SOURCES actor.cpp router.cpp core.cpp + loop.cpp ) set(HEADERS @@ -10,6 +11,7 @@ set(HEADERS actor.h router.h core.h + loop.h ) target_sources(${EXEC_NAME} PRIVATE ${SOURCES}) diff --git a/component/core.cpp b/component/core.cpp index 5f2dfa9..e65ca51 100644 --- a/component/core.cpp +++ b/component/core.cpp @@ -9,7 +9,8 @@ Core::Core(const std::string& configPath): config(configPath), logger(config.getLogLevel()), router(logger), - initialized(false) + initialized(false), + connection() {} void Core::send(const std::string& jid, const std::string& body) { diff --git a/component/loop.cpp b/component/loop.cpp new file mode 100644 index 0000000..6a965e9 --- /dev/null +++ b/component/loop.cpp @@ -0,0 +1,185 @@ +// SPDX-FileCopyrightText: 2024 Yury Gubich +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "loop.h" + +#include +#include +#include +#include +#include + +Loop::Loop(const Shared::Logger& logger): + Shared::Loggable(logger, {"Loop"}), + wakePipe(), + handlers(), + mutex(), + running(false) +{ + if (pipe(wakePipe) < 0) { + fatal(std::string("pipe: ") + strerror(errno)); + std::exit(1); + } + + fcntl(wakePipe[0], F_SETFL, O_NONBLOCK); + fcntl(wakePipe[1], F_SETFL, O_NONBLOCK); + + registerInstance(this); +} + +Loop::~Loop() { + unregisterInstance(this); + + close(wakePipe[0]); + close(wakePipe[1]); +} + +void Loop::run() { + if (running.exchange(true)) { + warn("an attempt to run already running event loop, skipping"); + return; + } + + debug("entering the loop"); + while (running) { + std::unique_lock lock(mutex); + + fd_set readfds; + FD_ZERO(&readfds); + FD_SET(wakePipe[0], &readfds); + + int maxFD = wakePipe[0]; + for (const std::pair& pair : handlers) { + FD_SET(pair.first, &readfds); + maxFD = std::max(maxFD, pair.first); + } + + int result = select(maxFD + 1, &readfds, nullptr, nullptr, nullptr); + + if (result < 0) { + drain(&readfds); + if (errno == EAGAIN) { + debug("woke up"); + continue; + }; + + if (errno == EINTR) { + debug("interrupted"); + continue; + }; + fatal(std::string("select: ") + strerror(errno)); + running = false; + break; + } + + + if (drain(&readfds)) + continue; + + Handlers copy; + for (const std::pair& pair : handlers) + if (FD_ISSET(pair.first, &readfds)) + copy.insert(pair); + + lock.unlock(); + + if (runCallbacks(copy)) + continue; + } + + debug("left the loop"); +} + +void Loop::stop() { + if (!running.exchange(false)) + return; + + debug("stopping the loop"); + wake(); +} + +void Loop::addDescriptor(int descriptor, const Callback& handler) { + wake(); + std::lock_guard lock(mutex); + + if (!handlers.emplace(descriptor, handler).second) + warn("an attempt to add descriptor " + std::to_string(descriptor) + " for the second time"); +} + +void Loop::removeDescriptor(int descriptor) { + wake(); + std::lock_guard lock(mutex); + + if (handlers.erase(descriptor) == 0) + warn("an attempt to remove unknown descriptor " + std::to_string(descriptor)); +} + +void Loop::wake() { + char w = 'w'; + if (write(wakePipe[1], &w, 1) < 0 && errno != EAGAIN) + error("failed to wake up event loop: " + std::string(strerror(errno))); +} + +bool Loop::drain(fd_set* readfds) { + if (!FD_ISSET(wakePipe[0], readfds)) + return false; + + char buf[64]; + while (true) { + ssize_t r = read(wakePipe[0], buf, sizeof(buf)); + if (r > 0) + continue; + + if (r == -1) { + if (errno == EAGAIN) + break; + + error("failed to drain wake pipe: " + std::string(strerror(errno))); + break; + } + + if (r == 0) + break; // pipe closed + } + + return true; +} + +bool Loop::runCallbacks(const Handlers& copy) { + for (const std::pair& pair : copy) + try { + pair.second(); + return true; + } catch (const std::exception& e) { + error("exception in event loop from handler with file descriptor " + std::to_string(pair.first) + ":\n" + e.what()); + } catch (...) { + error("unhandled throw in event loop from handler with file descriptor " + std::to_string(pair.first)); + } + + return false; +} + +void Loop::registerInstance(Loop* loop) { + std::lock_guard lock(instanceMutex); + if (instances.size() == 0) { + std::signal(SIGINT, signalHandler); + std::signal(SIGTERM, signalHandler); + } + + instances.insert(loop); +} + +void Loop::unregisterInstance(Loop* loop) { + std::lock_guard lock(instanceMutex); + instances.erase(loop); + + if (instances.size() == 0) { + std::signal(SIGINT, SIG_DFL); + std::signal(SIGTERM, SIG_DFL); + } +} + +void Loop::signalHandler(int signum) { + for (Loop* loop : instances) + loop->stop(); +} diff --git a/component/loop.h b/component/loop.h new file mode 100644 index 0000000..632a16d --- /dev/null +++ b/component/loop.h @@ -0,0 +1,50 @@ +// SPDX-FileCopyrightText: 2024 Yury Gubich +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "shared/loggable.h" +#include "shared/logger.h" + +class Loop : Shared::Loggable { +public: + typedef std::function Callback; + typedef std::map Handlers; + + Loop(const Shared::Logger& logger); + ~Loop(); + + void run(); + void stop(); + void addDescriptor(int descriptor, const Callback& handler); + void removeDescriptor(int descriptor); + +private: + void wake(); + bool drain(fd_set* readfds); + bool runCallbacks(const Handlers& copy); + +private: + + int wakePipe[2]; + Handlers handlers; + std::mutex mutex; + std::atomic running; + +private: + static void registerInstance(Loop* loop); + static void unregisterInstance(Loop* loop); + static void signalHandler(int signum); + + static inline std::set instances; + static inline std::mutex instanceMutex; + +}; diff --git a/connection/connection.cpp b/connection/connection.cpp index 7390563..c9f0e3e 100644 --- a/connection/connection.cpp +++ b/connection/connection.cpp @@ -3,6 +3,8 @@ #include "connection.h" +#include "gloox/connectiontcpclient.h" + Connection::Connection(const std::shared_ptr& core): Shared::Loggable(core->logger, {"Connection"}), state(initial), @@ -53,14 +55,26 @@ void Connection::deinitialize() { state = initial; } -void Connection::connect() { - if (state != disconnected) - return; +int Connection::connect() { + if (state != disconnected) { + std::string message("an attempt to call connect in the wrong state"); + fatal(message); + throw std::runtime_error(message); + } debug("connecting"); - state = connected; - gloox->connect(true); - state = disconnected; + state = connecting; + gloox->connect(false); + + return static_cast(gloox->connectionImpl())->socket(); +} + +void Connection::disconnect() { + if (state != connecting || state != connected) + return; + + gloox->disconnect(); + state = disconnecting; } void Connection::send(const std::string& jid, const std::string& body) { @@ -87,6 +101,10 @@ void Connection::publish(const std::string& service, const std::string& node, co pubsub->publishItem(service, node, list, nullptr, this); } +void Connection::processMessages() { + gloox->recv(); +} + std::string Connection::errorTypeToString(gloox::StanzaErrorType err) { switch (err) { case gloox::StanzaErrorTypeAuth: @@ -131,10 +149,12 @@ void Connection::handleItemPublication(const std::string& id, const gloox::JID& } void Connection::onConnect() { + state = connected; info("connection established"); } void Connection::onDisconnect(gloox::ConnectionError e) { + state = disconnected; std::string error; switch (e) { diff --git a/connection/connection.h b/connection/connection.h index 0e4395e..6693232 100644 --- a/connection/connection.h +++ b/connection/connection.h @@ -28,7 +28,9 @@ public: enum State { initial, disconnected, - connected + connecting, + connected, + disconnecting }; public: @@ -37,9 +39,11 @@ public: void initialize(); void deinitialize(); - void connect(); + int connect(); + void disconnect(); void send(const std::string& jid, const std::string& body); void publish(const std::string& service, const std::string& node, const std::string& title, const std::string& body); + void processMessages(); static std::string errorTypeToString(gloox::StanzaErrorType err); diff --git a/jay.cpp b/jay.cpp index 476865e..fce86b0 100644 --- a/jay.cpp +++ b/jay.cpp @@ -28,7 +28,9 @@ static const std::map< Jay::Jay(const std::string& configPath): core(std::make_shared(configPath)), connection(std::make_shared(core)), - modules() + modules(), + loop(core->logger), + initialized(false) {} Jay::~Jay() { @@ -41,13 +43,25 @@ bool Jay::isConfigValid() const { void Jay::run() { initialize(); - connection->connect(); + + int fd = connection->connect(); + loop.addDescriptor(fd, std::bind(&Connection::processMessages, connection.get())); + + loop.run(); + + connection->disconnect(); + loop.removeDescriptor(fd); } void Jay::initialize() { + if (initialized) + return; + createModules(); core->initialize(connection); connection->initialize(); + + initialized = true; } void Jay::createModules() { diff --git a/jay.h b/jay.h index 0f3c694..ec85592 100644 --- a/jay.h +++ b/jay.h @@ -15,6 +15,7 @@ #include "shared/definitions.h" #include "component/core.h" +#include "component/loop.h" #include "connection/connection.h" #include "module/module.h" @@ -35,4 +36,6 @@ private: std::shared_ptr core; std::shared_ptr connection; std::vector> modules; + Loop loop; + bool initialized; }; diff --git a/shared/loggable.cpp b/shared/loggable.cpp index 52dbcf0..7828e28 100644 --- a/shared/loggable.cpp +++ b/shared/loggable.cpp @@ -27,3 +27,7 @@ void Shared::Loggable::warn(const std::string& message) const { void Shared::Loggable::error(const std::string& message) const { logger.log(Logger::error, message, domain); } + +void Shared::Loggable::fatal(const std::string& message) const { + logger.log(Logger::fatal, message, domain); +} diff --git a/shared/loggable.h b/shared/loggable.h index 48d9eb7..d65adee 100644 --- a/shared/loggable.h +++ b/shared/loggable.h @@ -24,6 +24,7 @@ protected: void info(const std::string& message) const; void warn(const std::string& message) const; void error(const std::string& message) const; + void fatal(const std::string& message) const; }; } \ No newline at end of file