An event loop

This commit is contained in:
Blue 2025-04-12 13:12:38 +03:00
parent ce29081a5f
commit 69e8098cce
Signed by: blue
GPG Key ID: 9B203B252A63EE38
10 changed files with 295 additions and 11 deletions

View File

@ -3,6 +3,7 @@ set(SOURCES
actor.cpp actor.cpp
router.cpp router.cpp
core.cpp core.cpp
loop.cpp
) )
set(HEADERS set(HEADERS
@ -10,6 +11,7 @@ set(HEADERS
actor.h actor.h
router.h router.h
core.h core.h
loop.h
) )
target_sources(${EXEC_NAME} PRIVATE ${SOURCES}) target_sources(${EXEC_NAME} PRIVATE ${SOURCES})

View File

@ -9,7 +9,8 @@ Core::Core(const std::string& configPath):
config(configPath), config(configPath),
logger(config.getLogLevel()), logger(config.getLogLevel()),
router(logger), router(logger),
initialized(false) initialized(false),
connection()
{} {}
void Core::send(const std::string& jid, const std::string& body) { void Core::send(const std::string& jid, const std::string& body) {

185
component/loop.cpp Normal file
View File

@ -0,0 +1,185 @@
// SPDX-FileCopyrightText: 2024 Yury Gubich <blue@macaw.me>
// SPDX-License-Identifier: GPL-3.0-or-later
#include "loop.h"
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <csignal>
Loop::Loop(const Shared::Logger& logger):
Shared::Loggable(logger, {"Loop"}),
wakePipe(),
handlers(),
mutex(),
running(false)
{
if (pipe(wakePipe) < 0) {
fatal(std::string("pipe: ") + strerror(errno));
std::exit(1);
}
fcntl(wakePipe[0], F_SETFL, O_NONBLOCK);
fcntl(wakePipe[1], F_SETFL, O_NONBLOCK);
registerInstance(this);
}
Loop::~Loop() {
unregisterInstance(this);
close(wakePipe[0]);
close(wakePipe[1]);
}
void Loop::run() {
if (running.exchange(true)) {
warn("an attempt to run already running event loop, skipping");
return;
}
debug("entering the loop");
while (running) {
std::unique_lock lock(mutex);
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(wakePipe[0], &readfds);
int maxFD = wakePipe[0];
for (const std::pair<const int, Callback>& pair : handlers) {
FD_SET(pair.first, &readfds);
maxFD = std::max(maxFD, pair.first);
}
int result = select(maxFD + 1, &readfds, nullptr, nullptr, nullptr);
if (result < 0) {
drain(&readfds);
if (errno == EAGAIN) {
debug("woke up");
continue;
};
if (errno == EINTR) {
debug("interrupted");
continue;
};
fatal(std::string("select: ") + strerror(errno));
running = false;
break;
}
if (drain(&readfds))
continue;
Handlers copy;
for (const std::pair<const int, Callback>& pair : handlers)
if (FD_ISSET(pair.first, &readfds))
copy.insert(pair);
lock.unlock();
if (runCallbacks(copy))
continue;
}
debug("left the loop");
}
void Loop::stop() {
if (!running.exchange(false))
return;
debug("stopping the loop");
wake();
}
void Loop::addDescriptor(int descriptor, const Callback& handler) {
wake();
std::lock_guard lock(mutex);
if (!handlers.emplace(descriptor, handler).second)
warn("an attempt to add descriptor " + std::to_string(descriptor) + " for the second time");
}
void Loop::removeDescriptor(int descriptor) {
wake();
std::lock_guard lock(mutex);
if (handlers.erase(descriptor) == 0)
warn("an attempt to remove unknown descriptor " + std::to_string(descriptor));
}
void Loop::wake() {
char w = 'w';
if (write(wakePipe[1], &w, 1) < 0 && errno != EAGAIN)
error("failed to wake up event loop: " + std::string(strerror(errno)));
}
bool Loop::drain(fd_set* readfds) {
if (!FD_ISSET(wakePipe[0], readfds))
return false;
char buf[64];
while (true) {
ssize_t r = read(wakePipe[0], buf, sizeof(buf));
if (r > 0)
continue;
if (r == -1) {
if (errno == EAGAIN)
break;
error("failed to drain wake pipe: " + std::string(strerror(errno)));
break;
}
if (r == 0)
break; // pipe closed
}
return true;
}
bool Loop::runCallbacks(const Handlers& copy) {
for (const std::pair<const int, Callback>& pair : copy)
try {
pair.second();
return true;
} catch (const std::exception& e) {
error("exception in event loop from handler with file descriptor " + std::to_string(pair.first) + ":\n" + e.what());
} catch (...) {
error("unhandled throw in event loop from handler with file descriptor " + std::to_string(pair.first));
}
return false;
}
void Loop::registerInstance(Loop* loop) {
std::lock_guard lock(instanceMutex);
if (instances.size() == 0) {
std::signal(SIGINT, signalHandler);
std::signal(SIGTERM, signalHandler);
}
instances.insert(loop);
}
void Loop::unregisterInstance(Loop* loop) {
std::lock_guard lock(instanceMutex);
instances.erase(loop);
if (instances.size() == 0) {
std::signal(SIGINT, SIG_DFL);
std::signal(SIGTERM, SIG_DFL);
}
}
void Loop::signalHandler(int signum) {
for (Loop* loop : instances)
loop->stop();
}

50
component/loop.h Normal file
View File

@ -0,0 +1,50 @@
// SPDX-FileCopyrightText: 2024 Yury Gubich <blue@macaw.me>
// SPDX-License-Identifier: GPL-3.0-or-later
#pragma once
#include <set>
#include <map>
#include <functional>
#include <mutex>
#include <atomic>
#include <sys/select.h>
#include "shared/loggable.h"
#include "shared/logger.h"
class Loop : Shared::Loggable {
public:
typedef std::function<void()> Callback;
typedef std::map<int, Callback> Handlers;
Loop(const Shared::Logger& logger);
~Loop();
void run();
void stop();
void addDescriptor(int descriptor, const Callback& handler);
void removeDescriptor(int descriptor);
private:
void wake();
bool drain(fd_set* readfds);
bool runCallbacks(const Handlers& copy);
private:
int wakePipe[2];
Handlers handlers;
std::mutex mutex;
std::atomic<bool> running;
private:
static void registerInstance(Loop* loop);
static void unregisterInstance(Loop* loop);
static void signalHandler(int signum);
static inline std::set<Loop*> instances;
static inline std::mutex instanceMutex;
};

View File

@ -3,6 +3,8 @@
#include "connection.h" #include "connection.h"
#include "gloox/connectiontcpclient.h"
Connection::Connection(const std::shared_ptr<Core>& core): Connection::Connection(const std::shared_ptr<Core>& core):
Shared::Loggable(core->logger, {"Connection"}), Shared::Loggable(core->logger, {"Connection"}),
state(initial), state(initial),
@ -53,14 +55,26 @@ void Connection::deinitialize() {
state = initial; state = initial;
} }
void Connection::connect() { int Connection::connect() {
if (state != disconnected) if (state != disconnected) {
return; std::string message("an attempt to call connect in the wrong state");
fatal(message);
throw std::runtime_error(message);
}
debug("connecting"); debug("connecting");
state = connected; state = connecting;
gloox->connect(true); gloox->connect(false);
state = disconnected;
return static_cast<gloox::ConnectionTCPClient*>(gloox->connectionImpl())->socket();
}
void Connection::disconnect() {
if (state != connecting || state != connected)
return;
gloox->disconnect();
state = disconnecting;
} }
void Connection::send(const std::string& jid, const std::string& body) { void Connection::send(const std::string& jid, const std::string& body) {
@ -87,6 +101,10 @@ void Connection::publish(const std::string& service, const std::string& node, co
pubsub->publishItem(service, node, list, nullptr, this); pubsub->publishItem(service, node, list, nullptr, this);
} }
void Connection::processMessages() {
gloox->recv();
}
std::string Connection::errorTypeToString(gloox::StanzaErrorType err) { std::string Connection::errorTypeToString(gloox::StanzaErrorType err) {
switch (err) { switch (err) {
case gloox::StanzaErrorTypeAuth: case gloox::StanzaErrorTypeAuth:
@ -131,10 +149,12 @@ void Connection::handleItemPublication(const std::string& id, const gloox::JID&
} }
void Connection::onConnect() { void Connection::onConnect() {
state = connected;
info("connection established"); info("connection established");
} }
void Connection::onDisconnect(gloox::ConnectionError e) { void Connection::onDisconnect(gloox::ConnectionError e) {
state = disconnected;
std::string error; std::string error;
switch (e) { switch (e) {

View File

@ -28,7 +28,9 @@ public:
enum State { enum State {
initial, initial,
disconnected, disconnected,
connected connecting,
connected,
disconnecting
}; };
public: public:
@ -37,9 +39,11 @@ public:
void initialize(); void initialize();
void deinitialize(); void deinitialize();
void connect(); int connect();
void disconnect();
void send(const std::string& jid, const std::string& body); void send(const std::string& jid, const std::string& body);
void publish(const std::string& service, const std::string& node, const std::string& title, const std::string& body); void publish(const std::string& service, const std::string& node, const std::string& title, const std::string& body);
void processMessages();
static std::string errorTypeToString(gloox::StanzaErrorType err); static std::string errorTypeToString(gloox::StanzaErrorType err);

18
jay.cpp
View File

@ -28,7 +28,9 @@ static const std::map<
Jay::Jay(const std::string& configPath): Jay::Jay(const std::string& configPath):
core(std::make_shared<Core>(configPath)), core(std::make_shared<Core>(configPath)),
connection(std::make_shared<Connection>(core)), connection(std::make_shared<Connection>(core)),
modules() modules(),
loop(core->logger),
initialized(false)
{} {}
Jay::~Jay() { Jay::~Jay() {
@ -41,13 +43,25 @@ bool Jay::isConfigValid() const {
void Jay::run() { void Jay::run() {
initialize(); initialize();
connection->connect();
int fd = connection->connect();
loop.addDescriptor(fd, std::bind(&Connection::processMessages, connection.get()));
loop.run();
connection->disconnect();
loop.removeDescriptor(fd);
} }
void Jay::initialize() { void Jay::initialize() {
if (initialized)
return;
createModules(); createModules();
core->initialize(connection); core->initialize(connection);
connection->initialize(); connection->initialize();
initialized = true;
} }
void Jay::createModules() { void Jay::createModules() {

3
jay.h
View File

@ -15,6 +15,7 @@
#include "shared/definitions.h" #include "shared/definitions.h"
#include "component/core.h" #include "component/core.h"
#include "component/loop.h"
#include "connection/connection.h" #include "connection/connection.h"
#include "module/module.h" #include "module/module.h"
@ -35,4 +36,6 @@ private:
std::shared_ptr<Core> core; std::shared_ptr<Core> core;
std::shared_ptr<Connection> connection; std::shared_ptr<Connection> connection;
std::vector<std::shared_ptr<Module::Module>> modules; std::vector<std::shared_ptr<Module::Module>> modules;
Loop loop;
bool initialized;
}; };

View File

@ -27,3 +27,7 @@ void Shared::Loggable::warn(const std::string& message) const {
void Shared::Loggable::error(const std::string& message) const { void Shared::Loggable::error(const std::string& message) const {
logger.log(Logger::error, message, domain); logger.log(Logger::error, message, domain);
} }
void Shared::Loggable::fatal(const std::string& message) const {
logger.log(Logger::fatal, message, domain);
}

View File

@ -24,6 +24,7 @@ protected:
void info(const std::string& message) const; void info(const std::string& message) const;
void warn(const std::string& message) const; void warn(const std::string& message) const;
void error(const std::string& message) const; void error(const std::string& message) const;
void fatal(const std::string& message) const;
}; };
} }