libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.4% Lines (394/490) 89.1% Functions (41/46) 68.2% Branches (206/302)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105
106 158 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
107 158 : key(k)
108 158 , next(n)
109 158 , private_outstanding_work(0)
110 {
111 158 }
112 };
113
114 namespace {
115
116 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
117
118 struct thread_context_guard
119 {
120 scheduler_context frame_;
121
122 158 explicit thread_context_guard(
123 epoll_scheduler const* ctx) noexcept
124 158 : frame_(ctx, context_stack.get())
125 {
126 158 context_stack.set(&frame_);
127 158 }
128
129 158 ~thread_context_guard() noexcept
130 {
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
158 if (!frame_.private_queue.empty())
132 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
133 158 context_stack.set(frame_.next);
134 158 }
135 };
136
137 scheduler_context*
138 229145 find_context(epoll_scheduler const* self) noexcept
139 {
140
2/2
✓ Branch 1 taken 227496 times.
✓ Branch 2 taken 1649 times.
229145 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
141
1/2
✓ Branch 0 taken 227496 times.
✗ Branch 1 not taken.
227496 if (c->key == self)
142 227496 return c;
143 1649 return nullptr;
144 }
145
146 } // namespace
147
148 void
149 79209 descriptor_state::
150 operator()()
151 {
152 79209 is_enqueued_.store(false, std::memory_order_relaxed);
153
154 // Take ownership of impl ref set by close_socket() to prevent
155 // the owning impl from being freed while we're executing
156 79209 auto prevent_impl_destruction = std::move(impl_ref_);
157
158 79209 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 79209 times.
79209 if (ev == 0)
160 {
161 scheduler_->compensating_work_started();
162 return;
163 }
164
165 79209 op_queue local_ops;
166
167 79209 int err = 0;
168
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 79207 times.
79209 if (ev & EPOLLERR)
169 {
170 2 socklen_t len = sizeof(err);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
172 err = errno;
173
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if (err == 0)
174 1 err = EIO;
175 }
176
177 79209 epoll_op* rd = nullptr;
178 79209 epoll_op* wr = nullptr;
179 79209 epoll_op* cn = nullptr;
180 {
181
1/1
✓ Branch 1 taken 79209 times.
79209 std::lock_guard lock(mutex);
182
2/2
✓ Branch 0 taken 37801 times.
✓ Branch 1 taken 41408 times.
79209 if (ev & EPOLLIN)
183 {
184 37801 rd = std::exchange(read_op, nullptr);
185
2/2
✓ Branch 0 taken 35137 times.
✓ Branch 1 taken 2664 times.
37801 if (!rd)
186 35137 read_ready = true;
187 }
188
2/2
✓ Branch 0 taken 76596 times.
✓ Branch 1 taken 2613 times.
79209 if (ev & EPOLLOUT)
189 {
190 76596 cn = std::exchange(connect_op, nullptr);
191 76596 wr = std::exchange(write_op, nullptr);
192
3/4
✓ Branch 0 taken 73979 times.
✓ Branch 1 taken 2617 times.
✓ Branch 2 taken 73979 times.
✗ Branch 3 not taken.
76596 if (!cn && !wr)
193 73979 write_ready = true;
194 }
195
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 79207 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
79209 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
196 {
197 rd = std::exchange(read_op, nullptr);
198 wr = std::exchange(write_op, nullptr);
199 cn = std::exchange(connect_op, nullptr);
200 }
201 79209 }
202
203 // Non-null after I/O means EAGAIN; re-register under lock below
204
2/2
✓ Branch 0 taken 2664 times.
✓ Branch 1 taken 76545 times.
79209 if (rd)
205 {
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2664 times.
2664 if (err)
207 rd->complete(err, 0);
208 else
209 2664 rd->perform_io();
210
211
2/4
✓ Branch 0 taken 2664 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2664 times.
2664 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
212 {
213 rd->errn = 0;
214 }
215 else
216 {
217 2664 local_ops.push(rd);
218 2664 rd = nullptr;
219 }
220 }
221
222
2/2
✓ Branch 0 taken 2617 times.
✓ Branch 1 taken 76592 times.
79209 if (cn)
223 {
224
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2616 times.
2617 if (err)
225 1 cn->complete(err, 0);
226 else
227 2616 cn->perform_io();
228 2617 local_ops.push(cn);
229 2617 cn = nullptr;
230 }
231
232
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 79209 times.
79209 if (wr)
233 {
234 if (err)
235 wr->complete(err, 0);
236 else
237 wr->perform_io();
238
239 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
240 {
241 wr->errn = 0;
242 }
243 else
244 {
245 local_ops.push(wr);
246 wr = nullptr;
247 }
248 }
249
250
2/4
✓ Branch 0 taken 79209 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 79209 times.
79209 if (rd || wr)
251 {
252 std::lock_guard lock(mutex);
253 if (rd)
254 read_op = rd;
255 if (wr)
256 write_op = wr;
257 }
258
259 // Execute first handler inline — the scheduler's work_cleanup
260 // accounts for this as the "consumed" work item
261 79209 scheduler_op* first = local_ops.pop();
262
2/2
✓ Branch 0 taken 5281 times.
✓ Branch 1 taken 73928 times.
79209 if (first)
263 {
264
1/1
✓ Branch 1 taken 5281 times.
5281 scheduler_->post_deferred_completions(local_ops);
265
1/1
✓ Branch 1 taken 5281 times.
5281 (*first)();
266 }
267 else
268 {
269 73928 scheduler_->compensating_work_started();
270 }
271 79209 }
272
273 189 epoll_scheduler::
274 epoll_scheduler(
275 capy::execution_context& ctx,
276 189 int)
277 189 : epoll_fd_(-1)
278 189 , event_fd_(-1)
279 189 , timer_fd_(-1)
280 189 , outstanding_work_(0)
281 189 , stopped_(false)
282 189 , shutdown_(false)
283 189 , task_running_{false}
284 189 , task_interrupted_(false)
285 378 , state_(0)
286 {
287 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
288
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
289 detail::throw_system_error(make_err(errno), "epoll_create1");
290
291 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
293 {
294 int errn = errno;
295 ::close(epoll_fd_);
296 detail::throw_system_error(make_err(errn), "eventfd");
297 }
298
299 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
301 {
302 int errn = errno;
303 ::close(event_fd_);
304 ::close(epoll_fd_);
305 detail::throw_system_error(make_err(errn), "timerfd_create");
306 }
307
308 189 epoll_event ev{};
309 189 ev.events = EPOLLIN | EPOLLET;
310 189 ev.data.ptr = nullptr;
311
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
312 {
313 int errn = errno;
314 ::close(timer_fd_);
315 ::close(event_fd_);
316 ::close(epoll_fd_);
317 detail::throw_system_error(make_err(errn), "epoll_ctl");
318 }
319
320 189 epoll_event timer_ev{};
321 189 timer_ev.events = EPOLLIN | EPOLLERR;
322 189 timer_ev.data.ptr = &timer_fd_;
323
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
324 {
325 int errn = errno;
326 ::close(timer_fd_);
327 ::close(event_fd_);
328 ::close(epoll_fd_);
329 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
330 }
331
332
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
333
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
334 timer_service::callback(
335 this,
336 [](void* p) {
337 2809 auto* self = static_cast<epoll_scheduler*>(p);
338 2809 self->timerfd_stale_.store(true, std::memory_order_release);
339
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2809 times.
2809 if (self->task_running_.load(std::memory_order_acquire))
340 self->interrupt_reactor();
341 2809 }));
342
343 // Initialize resolver service
344
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
345
346 // Initialize signal service
347
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
348
349 // Push task sentinel to interleave reactor runs with handler execution
350 189 completed_ops_.push(&task_op_);
351 189 }
352
353 378 epoll_scheduler::
354 189 ~epoll_scheduler()
355 {
356
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
357 189 ::close(timer_fd_);
358
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
359 189 ::close(event_fd_);
360
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
361 189 ::close(epoll_fd_);
362 378 }
363
364 void
365 189 epoll_scheduler::
366 shutdown()
367 {
368 {
369
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
370 189 shutdown_ = true;
371
372
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
373 {
374
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
375 189 continue;
376 lock.unlock();
377 h->destroy();
378 lock.lock();
379 189 }
380
381 189 signal_all(lock);
382 189 }
383
384 189 outstanding_work_.store(0, std::memory_order_release);
385
386
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
387 189 interrupt_reactor();
388 189 }
389
390 void
391 4487 epoll_scheduler::
392 post(capy::coro h) const
393 {
394 struct post_handler final
395 : scheduler_op
396 {
397 capy::coro h_;
398
399 explicit
400 4487 post_handler(capy::coro h)
401 4487 : h_(h)
402 {
403 4487 }
404
405 8974 ~post_handler() = default;
406
407 4487 void operator()() override
408 {
409 4487 auto h = h_;
410
1/2
✓ Branch 0 taken 4487 times.
✗ Branch 1 not taken.
4487 delete this;
411
1/1
✓ Branch 1 taken 4487 times.
4487 h.resume();
412 4487 }
413
414 void destroy() override
415 {
416 delete this;
417 }
418 };
419
420
1/1
✓ Branch 1 taken 4487 times.
4487 auto ph = std::make_unique<post_handler>(h);
421
422 // Fast path: same thread posts to private queue
423 // Only count locally; work_cleanup batches to global counter
424
2/2
✓ Branch 1 taken 2864 times.
✓ Branch 2 taken 1623 times.
4487 if (auto* ctx = find_context(this))
425 {
426 2864 ++ctx->private_outstanding_work;
427 2864 ctx->private_queue.push(ph.release());
428 2864 return;
429 }
430
431 // Slow path: cross-thread post requires mutex
432 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
433
434
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
435 1623 completed_ops_.push(ph.release());
436
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
437 4487 }
438
439 void
440 150730 epoll_scheduler::
441 post(scheduler_op* h) const
442 {
443 // Fast path: same thread posts to private queue
444 // Only count locally; work_cleanup batches to global counter
445
2/2
✓ Branch 1 taken 150704 times.
✓ Branch 2 taken 26 times.
150730 if (auto* ctx = find_context(this))
446 {
447 150704 ++ctx->private_outstanding_work;
448 150704 ctx->private_queue.push(h);
449 150704 return;
450 }
451
452 // Slow path: cross-thread post requires mutex
453 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
454
455
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
456 26 completed_ops_.push(h);
457
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
458 26 }
459
460 void
461 3304 epoll_scheduler::
462 on_work_started() noexcept
463 {
464 3304 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
465 3304 }
466
467 void
468 3272 epoll_scheduler::
469 on_work_finished() noexcept
470 {
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3272 times.
6544 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
472 stop();
473 3272 }
474
475 bool
476 3092 epoll_scheduler::
477 running_in_this_thread() const noexcept
478 {
479
2/2
✓ Branch 1 taken 2882 times.
✓ Branch 2 taken 210 times.
3092 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
480
1/2
✓ Branch 0 taken 2882 times.
✗ Branch 1 not taken.
2882 if (c->key == this)
481 2882 return true;
482 210 return false;
483 }
484
485 void
486 40 epoll_scheduler::
487 stop()
488 {
489
1/1
✓ Branch 1 taken 40 times.
40 std::unique_lock lock(mutex_);
490
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 20 times.
40 if (!stopped_)
491 {
492 20 stopped_ = true;
493 20 signal_all(lock);
494
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
495 }
496 40 }
497
498 bool
499 16 epoll_scheduler::
500 stopped() const noexcept
501 {
502 16 std::unique_lock lock(mutex_);
503 32 return stopped_;
504 16 }
505
506 void
507 49 epoll_scheduler::
508 restart()
509 {
510
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
511 49 stopped_ = false;
512 49 }
513
514 std::size_t
515 175 epoll_scheduler::
516 run()
517 {
518
2/2
✓ Branch 1 taken 31 times.
✓ Branch 2 taken 144 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
519 {
520
1/1
✓ Branch 1 taken 31 times.
31 stop();
521 31 return 0;
522 }
523
524 144 thread_context_guard ctx(this);
525
1/1
✓ Branch 1 taken 144 times.
144 std::unique_lock lock(mutex_);
526
527 144 std::size_t n = 0;
528 for (;;)
529 {
530
3/3
✓ Branch 1 taken 234555 times.
✓ Branch 3 taken 144 times.
✓ Branch 4 taken 234411 times.
234555 if (!do_one(lock, -1, &ctx.frame_))
531 144 break;
532
1/2
✓ Branch 1 taken 234411 times.
✗ Branch 2 not taken.
234411 if (n != (std::numeric_limits<std::size_t>::max)())
533 234411 ++n;
534
2/2
✓ Branch 1 taken 83702 times.
✓ Branch 2 taken 150709 times.
234411 if (!lock.owns_lock())
535
1/1
✓ Branch 1 taken 83702 times.
83702 lock.lock();
536 }
537 144 return n;
538 144 }
539
540 std::size_t
541 2 epoll_scheduler::
542 run_one()
543 {
544
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
545 {
546 stop();
547 return 0;
548 }
549
550 2 thread_context_guard ctx(this);
551
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
552
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
553 2 }
554
555 std::size_t
556 14 epoll_scheduler::
557 wait_one(long usec)
558 {
559
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
560 {
561
1/1
✓ Branch 1 taken 5 times.
5 stop();
562 5 return 0;
563 }
564
565 9 thread_context_guard ctx(this);
566
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
567
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
568 9 }
569
570 std::size_t
571 2 epoll_scheduler::
572 poll()
573 {
574
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
575 {
576
1/1
✓ Branch 1 taken 1 time.
1 stop();
577 1 return 0;
578 }
579
580 1 thread_context_guard ctx(this);
581
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
582
583 1 std::size_t n = 0;
584 for (;;)
585 {
586
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
587 1 break;
588
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
589 2 ++n;
590
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
591
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
592 }
593 1 return n;
594 1 }
595
596 std::size_t
597 4 epoll_scheduler::
598 poll_one()
599 {
600
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
601 {
602
1/1
✓ Branch 1 taken 2 times.
2 stop();
603 2 return 0;
604 }
605
606 2 thread_context_guard ctx(this);
607
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
608
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
609 2 }
610
611 void
612 5305 epoll_scheduler::
613 register_descriptor(int fd, descriptor_state* desc) const
614 {
615 5305 epoll_event ev{};
616 5305 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
617 5305 ev.data.ptr = desc;
618
619
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5305 times.
5305 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
620 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
621
622 5305 desc->registered_events = ev.events;
623 5305 desc->fd = fd;
624 5305 desc->scheduler_ = this;
625
626
1/1
✓ Branch 1 taken 5305 times.
5305 std::lock_guard lock(desc->mutex);
627 5305 desc->read_ready = false;
628 5305 desc->write_ready = false;
629 5305 }
630
631 void
632 5305 epoll_scheduler::
633 deregister_descriptor(int fd) const
634 {
635 5305 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
636 5305 }
637
638 void
639 5406 epoll_scheduler::
640 work_started() const noexcept
641 {
642 5406 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
643 5406 }
644
645 void
646 9943 epoll_scheduler::
647 work_finished() const noexcept
648 {
649
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 9795 times.
19886 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
650 {
651 // Last work item completed - wake all threads so they can exit.
652 // signal_all() wakes threads waiting on the condvar.
653 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
654 // Both are needed because they target different blocking mechanisms.
655 148 std::unique_lock lock(mutex_);
656 148 signal_all(lock);
657
5/6
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 147 times.
✓ Branch 3 taken 1 time.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✓ Branch 6 taken 147 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
658 {
659 1 task_interrupted_ = true;
660 1 lock.unlock();
661 1 interrupt_reactor();
662 }
663 148 }
664 9943 }
665
666 void
667 73928 epoll_scheduler::
668 compensating_work_started() const noexcept
669 {
670 73928 auto* ctx = find_context(this);
671
1/2
✓ Branch 0 taken 73928 times.
✗ Branch 1 not taken.
73928 if (ctx)
672 73928 ++ctx->private_outstanding_work;
673 73928 }
674
675 void
676 epoll_scheduler::
677 drain_thread_queue(op_queue& queue, long count) const
678 {
679 // Note: outstanding_work_ was already incremented when posting
680 std::unique_lock lock(mutex_);
681 completed_ops_.splice(queue);
682 if (count > 0)
683 maybe_unlock_and_signal_one(lock);
684 }
685
686 void
687 5281 epoll_scheduler::
688 post_deferred_completions(op_queue& ops) const
689 {
690
1/2
✓ Branch 1 taken 5281 times.
✗ Branch 2 not taken.
5281 if (ops.empty())
691 5281 return;
692
693 // Fast path: if on scheduler thread, use private queue
694 if (auto* ctx = find_context(this))
695 {
696 ctx->private_queue.splice(ops);
697 return;
698 }
699
700 // Slow path: add to global queue and wake a thread
701 std::unique_lock lock(mutex_);
702 completed_ops_.splice(ops);
703 wake_one_thread_and_unlock(lock);
704 }
705
706 void
707 236 epoll_scheduler::
708 interrupt_reactor() const
709 {
710 // Only write if not already armed to avoid redundant writes
711 236 bool expected = false;
712
2/2
✓ Branch 1 taken 222 times.
✓ Branch 2 taken 14 times.
236 if (eventfd_armed_.compare_exchange_strong(expected, true,
713 std::memory_order_release, std::memory_order_relaxed))
714 {
715 222 std::uint64_t val = 1;
716
1/1
✓ Branch 1 taken 222 times.
222 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
717 }
718 236 }
719
720 void
721 357 epoll_scheduler::
722 signal_all(std::unique_lock<std::mutex>&) const
723 {
724 357 state_ |= 1;
725 357 cond_.notify_all();
726 357 }
727
728 bool
729 1649 epoll_scheduler::
730 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
731 {
732 1649 state_ |= 1;
733
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
734 {
735 lock.unlock();
736 cond_.notify_one();
737 return true;
738 }
739 1649 return false;
740 }
741
742 void
743 317516 epoll_scheduler::
744 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
745 {
746 317516 state_ |= 1;
747 317516 bool have_waiters = state_ > 1;
748 317516 lock.unlock();
749
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 317516 times.
317516 if (have_waiters)
750 cond_.notify_one();
751 317516 }
752
753 void
754 epoll_scheduler::
755 clear_signal() const
756 {
757 state_ &= ~std::size_t(1);
758 }
759
760 void
761 epoll_scheduler::
762 wait_for_signal(std::unique_lock<std::mutex>& lock) const
763 {
764 while ((state_ & 1) == 0)
765 {
766 state_ += 2;
767 cond_.wait(lock);
768 state_ -= 2;
769 }
770 }
771
772 void
773 epoll_scheduler::
774 wait_for_signal_for(
775 std::unique_lock<std::mutex>& lock,
776 long timeout_us) const
777 {
778 if ((state_ & 1) == 0)
779 {
780 state_ += 2;
781 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
782 state_ -= 2;
783 }
784 }
785
786 void
787 1649 epoll_scheduler::
788 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
789 {
790
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
791 return;
792
793
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
794 {
795 26 task_interrupted_ = true;
796 26 lock.unlock();
797 26 interrupt_reactor();
798 }
799 else
800 {
801 1623 lock.unlock();
802 }
803 }
804
805 /** RAII guard for handler execution work accounting.
806
807 Handler consumes 1 work item, may produce N new items via fast-path posts.
808 Net change = N - 1:
809 - If N > 1: add (N-1) to global (more work produced than consumed)
810 - If N == 1: net zero, do nothing
811 - If N < 1: call work_finished() (work consumed, may trigger stop)
812
813 Also drains private queue to global for other threads to process.
814 */
815 struct work_cleanup
816 {
817 epoll_scheduler const* scheduler;
818 std::unique_lock<std::mutex>* lock;
819 scheduler_context* ctx;
820
821 234426 ~work_cleanup()
822 {
823
1/2
✓ Branch 0 taken 234426 times.
✗ Branch 1 not taken.
234426 if (ctx)
824 {
825 234426 long produced = ctx->private_outstanding_work;
826
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 234380 times.
234426 if (produced > 1)
827 46 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
828
2/2
✓ Branch 0 taken 9786 times.
✓ Branch 1 taken 224594 times.
234380 else if (produced < 1)
829 9786 scheduler->work_finished();
830 // produced == 1: net zero, handler consumed what it produced
831 234426 ctx->private_outstanding_work = 0;
832
833
2/2
✓ Branch 1 taken 150712 times.
✓ Branch 2 taken 83714 times.
234426 if (!ctx->private_queue.empty())
834 {
835 150712 lock->lock();
836 150712 scheduler->completed_ops_.splice(ctx->private_queue);
837 }
838 }
839 else
840 {
841 // No thread context - slow-path op was already counted globally
842 scheduler->work_finished();
843 }
844 234426 }
845 };
846
847 /** RAII guard for reactor work accounting.
848
849 Reactor only produces work via timer/signal callbacks posting handlers.
850 Unlike handler execution which consumes 1, the reactor consumes nothing.
851 All produced work must be flushed to global counter.
852 */
853 struct task_cleanup
854 {
855 epoll_scheduler const* scheduler;
856 std::unique_lock<std::mutex>* lock;
857 scheduler_context* ctx;
858
859 88536 ~task_cleanup()
860 88536 {
861
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 88536 times.
88536 if (!ctx)
862 return;
863
864
2/2
✓ Branch 0 taken 2807 times.
✓ Branch 1 taken 85729 times.
88536 if (ctx->private_outstanding_work > 0)
865 {
866 2807 scheduler->outstanding_work_.fetch_add(
867 2807 ctx->private_outstanding_work, std::memory_order_relaxed);
868 2807 ctx->private_outstanding_work = 0;
869 }
870
871
2/2
✓ Branch 1 taken 2807 times.
✓ Branch 2 taken 85729 times.
88536 if (!ctx->private_queue.empty())
872 {
873
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2807 times.
2807 if (!lock->owns_lock())
874 lock->lock();
875 2807 scheduler->completed_ops_.splice(ctx->private_queue);
876 }
877 88536 }
878 };
879
880 void
881 5612 epoll_scheduler::
882 update_timerfd() const
883 {
884 5612 auto nearest = timer_svc_->nearest_expiry();
885
886 5612 itimerspec ts{};
887 5612 int flags = 0;
888
889
3/3
✓ Branch 2 taken 5612 times.
✓ Branch 4 taken 5570 times.
✓ Branch 5 taken 42 times.
5612 if (nearest == timer_service::time_point::max())
890 {
891 // No timers - disarm by setting to 0 (relative)
892 }
893 else
894 {
895 5570 auto now = std::chrono::steady_clock::now();
896
3/3
✓ Branch 1 taken 5570 times.
✓ Branch 4 taken 14 times.
✓ Branch 5 taken 5556 times.
5570 if (nearest <= now)
897 {
898 // Use 1ns instead of 0 - zero disarms the timerfd
899 14 ts.it_value.tv_nsec = 1;
900 }
901 else
902 {
903 5556 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
904
1/1
✓ Branch 1 taken 5556 times.
11112 nearest - now).count();
905 5556 ts.it_value.tv_sec = nsec / 1000000000;
906 5556 ts.it_value.tv_nsec = nsec % 1000000000;
907 // Ensure non-zero to avoid disarming if duration rounds to 0
908
3/4
✓ Branch 0 taken 5552 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5552 times.
5556 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
909 ts.it_value.tv_nsec = 1;
910 }
911 }
912
913
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5612 times.
5612 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
914 detail::throw_system_error(make_err(errno), "timerfd_settime");
915 5612 }
916
917 void
918 88536 epoll_scheduler::
919 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
920 {
921
2/2
✓ Branch 0 taken 83090 times.
✓ Branch 1 taken 5446 times.
88536 int timeout_ms = task_interrupted_ ? 0 : -1;
922
923
2/2
✓ Branch 1 taken 5446 times.
✓ Branch 2 taken 83090 times.
88536 if (lock.owns_lock())
924
1/1
✓ Branch 1 taken 5446 times.
5446 lock.unlock();
925
926 88536 task_cleanup on_exit{this, &lock, ctx};
927
928 // Flush deferred timerfd programming before blocking
929
2/2
✓ Branch 1 taken 2805 times.
✓ Branch 2 taken 85731 times.
88536 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
930
1/1
✓ Branch 1 taken 2805 times.
2805 update_timerfd();
931
932 // Event loop runs without mutex held
933 epoll_event events[128];
934
1/1
✓ Branch 1 taken 88536 times.
88536 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
935
936
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 88536 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
88536 if (nfds < 0 && errno != EINTR)
937 detail::throw_system_error(make_err(errno), "epoll_wait");
938
939 88536 bool check_timers = false;
940 88536 op_queue local_ops;
941
942 // Process events without holding the mutex
943
2/2
✓ Branch 0 taken 82049 times.
✓ Branch 1 taken 88536 times.
170585 for (int i = 0; i < nfds; ++i)
944 {
945
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 82016 times.
82049 if (events[i].data.ptr == nullptr)
946 {
947 std::uint64_t val;
948
1/1
✓ Branch 1 taken 33 times.
33 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
949 33 eventfd_armed_.store(false, std::memory_order_relaxed);
950 33 continue;
951 33 }
952
953
2/2
✓ Branch 0 taken 2807 times.
✓ Branch 1 taken 79209 times.
82016 if (events[i].data.ptr == &timer_fd_)
954 {
955 std::uint64_t expirations;
956
1/1
✓ Branch 1 taken 2807 times.
2807 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
957 2807 check_timers = true;
958 2807 continue;
959 2807 }
960
961 // Deferred I/O: just set ready events and enqueue descriptor
962 // No per-descriptor mutex locking in reactor hot path!
963 79209 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
964 79209 desc->add_ready_events(events[i].events);
965
966 // Only enqueue if not already enqueued
967 79209 bool expected = false;
968
1/2
✓ Branch 1 taken 79209 times.
✗ Branch 2 not taken.
79209 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
969 std::memory_order_release, std::memory_order_relaxed))
970 {
971 79209 local_ops.push(desc);
972 }
973 }
974
975 // Process timers only when timerfd fires
976
2/2
✓ Branch 0 taken 2807 times.
✓ Branch 1 taken 85729 times.
88536 if (check_timers)
977 {
978
1/1
✓ Branch 1 taken 2807 times.
2807 timer_svc_->process_expired();
979
1/1
✓ Branch 1 taken 2807 times.
2807 update_timerfd();
980 }
981
982
1/1
✓ Branch 1 taken 88536 times.
88536 lock.lock();
983
984
2/2
✓ Branch 1 taken 47553 times.
✓ Branch 2 taken 40983 times.
88536 if (!local_ops.empty())
985 47553 completed_ops_.splice(local_ops);
986 88536 }
987
988 std::size_t
989 234571 epoll_scheduler::
990 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
991 {
992 for (;;)
993 {
994
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 323104 times.
323107 if (stopped_)
995 3 return 0;
996
997 323104 scheduler_op* op = completed_ops_.pop();
998
999 // Handle reactor sentinel - time to poll for I/O
1000
2/2
✓ Branch 0 taken 88677 times.
✓ Branch 1 taken 234427 times.
323104 if (op == &task_op_)
1001 {
1002 88677 bool more_handlers = !completed_ops_.empty();
1003
1004 // Nothing to run the reactor for: no pending work to wait on,
1005 // or caller requested a non-blocking poll
1006
4/4
✓ Branch 0 taken 5587 times.
✓ Branch 1 taken 83090 times.
✓ Branch 2 taken 141 times.
✓ Branch 3 taken 88536 times.
94264 if (!more_handlers &&
1007
3/4
✓ Branch 1 taken 5446 times.
✓ Branch 2 taken 141 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5446 times.
11174 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1008 timeout_us == 0))
1009 {
1010 141 completed_ops_.push(&task_op_);
1011 141 return 0;
1012 }
1013
1014
3/4
✓ Branch 0 taken 5446 times.
✓ Branch 1 taken 83090 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5446 times.
88536 task_interrupted_ = more_handlers || timeout_us == 0;
1015 88536 task_running_.store(true, std::memory_order_release);
1016
1017
2/2
✓ Branch 0 taken 83090 times.
✓ Branch 1 taken 5446 times.
88536 if (more_handlers)
1018 83090 unlock_and_signal_one(lock);
1019
1020 88536 run_task(lock, ctx);
1021
1022 88536 task_running_.store(false, std::memory_order_relaxed);
1023 88536 completed_ops_.push(&task_op_);
1024 88536 continue;
1025 88536 }
1026
1027 // Handle operation
1028
2/2
✓ Branch 0 taken 234426 times.
✓ Branch 1 taken 1 time.
234427 if (op != nullptr)
1029 {
1030
1/2
✓ Branch 1 taken 234426 times.
✗ Branch 2 not taken.
234426 if (!completed_ops_.empty())
1031
1/1
✓ Branch 1 taken 234426 times.
234426 unlock_and_signal_one(lock);
1032 else
1033 lock.unlock();
1034
1035 234426 work_cleanup on_exit{this, &lock, ctx};
1036
1037
1/1
✓ Branch 1 taken 234426 times.
234426 (*op)();
1038 234426 return 1;
1039 234426 }
1040
1041 // No pending work to wait on, or caller requested non-blocking poll
1042
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✗ Branch 6 not taken.
2 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 timeout_us == 0)
1044 1 return 0;
1045
1046 clear_signal();
1047 if (timeout_us < 0)
1048 wait_for_signal(lock);
1049 else
1050 wait_for_signal_for(lock, timeout_us);
1051 88536 }
1052 }
1053
1054 } // namespace boost::corosio::detail
1055
1056 #endif
1057