From d6f95d89fa4129baadde1a80a8050007a894c9a8 Mon Sep 17 00:00:00 2001 From: blue Date: Sun, 13 Apr 2025 21:12:24 +0300 Subject: [PATCH] Hopefully a bit cleaner solution --- component/loop.cpp | 103 +++++++++++++++++++++++++++------------------ component/loop.h | 8 +++- 2 files changed, 69 insertions(+), 42 deletions(-) diff --git a/component/loop.cpp b/component/loop.cpp index 6a965e9..ac7e69c 100644 --- a/component/loop.cpp +++ b/component/loop.cpp @@ -13,6 +13,8 @@ Loop::Loop(const Shared::Logger& logger): Shared::Loggable(logger, {"Loop"}), wakePipe(), handlers(), + handlersToAdd(), + descriptorsToRemove(), mutex(), running(false) { @@ -42,49 +44,26 @@ void Loop::run() { debug("entering the loop"); while (running) { - std::unique_lock lock(mutex); + syncHandlers(); fd_set readfds; - FD_ZERO(&readfds); - FD_SET(wakePipe[0], &readfds); - - int maxFD = wakePipe[0]; - for (const std::pair& pair : handlers) { - FD_SET(pair.first, &readfds); - maxFD = std::max(maxFD, pair.first); - } + int maxFD = setFDsAndFindMax(&readfds); int result = select(maxFD + 1, &readfds, nullptr, nullptr, nullptr); if (result < 0) { - drain(&readfds); - if (errno == EAGAIN) { - debug("woke up"); - continue; - }; - if (errno == EINTR) { debug("interrupted"); + // drain(&readfds); looks like it's not a good idea to drain here continue; }; fatal(std::string("select: ") + strerror(errno)); running = false; break; } - - if (drain(&readfds)) - continue; - - Handlers copy; - for (const std::pair& pair : handlers) - if (FD_ISSET(pair.first, &readfds)) - copy.insert(pair); - - lock.unlock(); - - if (runCallbacks(copy)) - continue; + drain(&readfds); + runCallbacks(&readfds); } debug("left the loop"); @@ -99,19 +78,38 @@ void Loop::stop() { } void Loop::addDescriptor(int descriptor, const Callback& handler) { - wake(); std::lock_guard lock(mutex); - if (!handlers.emplace(descriptor, handler).second) + if (handlers.count(descriptor) != 0) { warn("an attempt to add descriptor " + std::to_string(descriptor) + " for the second time"); + return; + } + + if (!handlersToAdd.emplace(descriptor, handler).second) { + warn("an attempt to add descriptor " + std::to_string(descriptor) + " for the second time"); + return; + } + + wake(); } void Loop::removeDescriptor(int descriptor) { - wake(); std::lock_guard lock(mutex); + + if (handlersToAdd.erase(descriptor) != 0) + return; - if (handlers.erase(descriptor) == 0) + if (handlers.count(descriptor) == 0) { warn("an attempt to remove unknown descriptor " + std::to_string(descriptor)); + return; + } + + if (!descriptorsToRemove.insert(descriptor).second) { + warn("an attempt to remove descriptor " + std::to_string(descriptor) + " for the second time"); + return; + } + + wake(); } void Loop::wake() { @@ -120,9 +118,9 @@ void Loop::wake() { error("failed to wake up event loop: " + std::string(strerror(errno))); } -bool Loop::drain(fd_set* readfds) { +void Loop::drain(fd_set* readfds) { if (!FD_ISSET(wakePipe[0], readfds)) - return false; + return; char buf[64]; while (true) { @@ -141,22 +139,47 @@ bool Loop::drain(fd_set* readfds) { if (r == 0) break; // pipe closed } - - return true; } -bool Loop::runCallbacks(const Handlers& copy) { - for (const std::pair& pair : copy) +void Loop::runCallbacks(fd_set* readfds) { + for (const std::pair& pair : handlers) { + if (!FD_ISSET(pair.first, readfds)) + continue; + try { pair.second(); - return true; } catch (const std::exception& e) { error("exception in event loop from handler with file descriptor " + std::to_string(pair.first) + ":\n" + e.what()); } catch (...) { error("unhandled throw in event loop from handler with file descriptor " + std::to_string(pair.first)); } + } +} - return false; +int Loop::setFDsAndFindMax(fd_set* readfds) { + FD_ZERO(readfds); + FD_SET(wakePipe[0], readfds); + + int maxFD = wakePipe[0]; + for (const std::pair& pair : handlers) { + FD_SET(pair.first, readfds); + maxFD = std::max(maxFD, pair.first); + } + + return maxFD; +} + +void Loop::syncHandlers() { + std::lock_guard lock(mutex); + + for (int descriptor : descriptorsToRemove) + handlers.erase(descriptor); + + for (const std::pair& pair : handlersToAdd) + handlers.insert(pair); + + descriptorsToRemove.clear(); + handlersToAdd.clear(); } void Loop::registerInstance(Loop* loop) { diff --git a/component/loop.h b/component/loop.h index 632a16d..fbe2382 100644 --- a/component/loop.h +++ b/component/loop.h @@ -29,13 +29,17 @@ public: private: void wake(); - bool drain(fd_set* readfds); - bool runCallbacks(const Handlers& copy); + void drain(fd_set* readfds); + void runCallbacks(fd_set* readfds); + int setFDsAndFindMax(fd_set* readfds); + void syncHandlers(); private: int wakePipe[2]; Handlers handlers; + Handlers handlersToAdd; + std::set descriptorsToRemove; std::mutex mutex; std::atomic running;