Hopefully a bit cleaner solution

This commit is contained in:
Blue 2025-04-13 21:12:24 +03:00
parent 69e8098cce
commit d6f95d89fa
Signed by: blue
GPG Key ID: 9B203B252A63EE38
2 changed files with 69 additions and 42 deletions

View File

@ -13,6 +13,8 @@ Loop::Loop(const Shared::Logger& logger):
Shared::Loggable(logger, {"Loop"}),
wakePipe(),
handlers(),
handlersToAdd(),
descriptorsToRemove(),
mutex(),
running(false)
{
@ -42,49 +44,26 @@ void Loop::run() {
debug("entering the loop");
while (running) {
std::unique_lock lock(mutex);
syncHandlers();
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(wakePipe[0], &readfds);
int maxFD = wakePipe[0];
for (const std::pair<const int, Callback>& pair : handlers) {
FD_SET(pair.first, &readfds);
maxFD = std::max(maxFD, pair.first);
}
int maxFD = setFDsAndFindMax(&readfds);
int result = select(maxFD + 1, &readfds, nullptr, nullptr, nullptr);
if (result < 0) {
drain(&readfds);
if (errno == EAGAIN) {
debug("woke up");
continue;
};
if (errno == EINTR) {
debug("interrupted");
// drain(&readfds); looks like it's not a good idea to drain here
continue;
};
fatal(std::string("select: ") + strerror(errno));
running = false;
break;
}
if (drain(&readfds))
continue;
Handlers copy;
for (const std::pair<const int, Callback>& pair : handlers)
if (FD_ISSET(pair.first, &readfds))
copy.insert(pair);
lock.unlock();
if (runCallbacks(copy))
continue;
drain(&readfds);
runCallbacks(&readfds);
}
debug("left the loop");
@ -99,19 +78,38 @@ void Loop::stop() {
}
void Loop::addDescriptor(int descriptor, const Callback& handler) {
wake();
std::lock_guard lock(mutex);
if (!handlers.emplace(descriptor, handler).second)
if (handlers.count(descriptor) != 0) {
warn("an attempt to add descriptor " + std::to_string(descriptor) + " for the second time");
return;
}
if (!handlersToAdd.emplace(descriptor, handler).second) {
warn("an attempt to add descriptor " + std::to_string(descriptor) + " for the second time");
return;
}
wake();
}
void Loop::removeDescriptor(int descriptor) {
wake();
std::lock_guard lock(mutex);
if (handlersToAdd.erase(descriptor) != 0)
return;
if (handlers.erase(descriptor) == 0)
if (handlers.count(descriptor) == 0) {
warn("an attempt to remove unknown descriptor " + std::to_string(descriptor));
return;
}
if (!descriptorsToRemove.insert(descriptor).second) {
warn("an attempt to remove descriptor " + std::to_string(descriptor) + " for the second time");
return;
}
wake();
}
void Loop::wake() {
@ -120,9 +118,9 @@ void Loop::wake() {
error("failed to wake up event loop: " + std::string(strerror(errno)));
}
bool Loop::drain(fd_set* readfds) {
void Loop::drain(fd_set* readfds) {
if (!FD_ISSET(wakePipe[0], readfds))
return false;
return;
char buf[64];
while (true) {
@ -141,22 +139,47 @@ bool Loop::drain(fd_set* readfds) {
if (r == 0)
break; // pipe closed
}
return true;
}
bool Loop::runCallbacks(const Handlers& copy) {
for (const std::pair<const int, Callback>& pair : copy)
void Loop::runCallbacks(fd_set* readfds) {
for (const std::pair<const int, Callback>& pair : handlers) {
if (!FD_ISSET(pair.first, readfds))
continue;
try {
pair.second();
return true;
} catch (const std::exception& e) {
error("exception in event loop from handler with file descriptor " + std::to_string(pair.first) + ":\n" + e.what());
} catch (...) {
error("unhandled throw in event loop from handler with file descriptor " + std::to_string(pair.first));
}
}
}
return false;
int Loop::setFDsAndFindMax(fd_set* readfds) {
FD_ZERO(readfds);
FD_SET(wakePipe[0], readfds);
int maxFD = wakePipe[0];
for (const std::pair<const int, Callback>& pair : handlers) {
FD_SET(pair.first, readfds);
maxFD = std::max(maxFD, pair.first);
}
return maxFD;
}
void Loop::syncHandlers() {
std::lock_guard lock(mutex);
for (int descriptor : descriptorsToRemove)
handlers.erase(descriptor);
for (const std::pair<const int, Callback>& pair : handlersToAdd)
handlers.insert(pair);
descriptorsToRemove.clear();
handlersToAdd.clear();
}
void Loop::registerInstance(Loop* loop) {

View File

@ -29,13 +29,17 @@ public:
private:
void wake();
bool drain(fd_set* readfds);
bool runCallbacks(const Handlers& copy);
void drain(fd_set* readfds);
void runCallbacks(fd_set* readfds);
int setFDsAndFindMax(fd_set* readfds);
void syncHandlers();
private:
int wakePipe[2];
Handlers handlers;
Handlers handlersToAdd;
std::set<int> descriptorsToRemove;
std::mutex mutex;
std::atomic<bool> running;