libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

67.2% Lines (317/472) 91.7% Functions (33/36) 50.4% Branches (134/266)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 void
34 104 epoll_op::canceller::
35 operator()() const noexcept
36 {
37 104 op->cancel();
38 104 }
39
40 void
41 epoll_connect_op::
42 cancel() noexcept
43 {
44 if (socket_impl_)
45 socket_impl_->cancel_single_op(*this);
46 else
47 request_cancel();
48 }
49
50 void
51 98 epoll_read_op::
52 cancel() noexcept
53 {
54
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
55 98 socket_impl_->cancel_single_op(*this);
56 else
57 request_cancel();
58 98 }
59
60 void
61 epoll_write_op::
62 cancel() noexcept
63 {
64 if (socket_impl_)
65 socket_impl_->cancel_single_op(*this);
66 else
67 request_cancel();
68 }
69
70 void
71 2617 epoll_connect_op::
72 operator()()
73 {
74 2617 stop_cb.reset();
75
76
3/4
✓ Branch 0 taken 2615 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 2615 times.
✗ Branch 4 not taken.
2617 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77
78 // Cache endpoints on successful connect
79
3/4
✓ Branch 0 taken 2615 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2615 times.
✗ Branch 3 not taken.
2617 if (success && socket_impl_)
80 {
81 // Query local endpoint via getsockname (may fail, but remote is always known)
82 2615 endpoint local_ep;
83 2615 sockaddr_in local_addr{};
84 2615 socklen_t local_len = sizeof(local_addr);
85
1/2
✓ Branch 1 taken 2615 times.
✗ Branch 2 not taken.
2615 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 2615 local_ep = from_sockaddr_in(local_addr);
87 // Always cache remote endpoint; local may be default if getsockname failed
88 2615 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 }
90
91
1/2
✓ Branch 0 taken 2617 times.
✗ Branch 1 not taken.
2617 if (ec_out)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2617 times.
2617 if (cancelled.load(std::memory_order_acquire))
94 *ec_out = capy::error::canceled;
95
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2615 times.
2617 else if (errn != 0)
96 2 *ec_out = make_err(errn);
97 else
98 2615 *ec_out = {};
99 }
100
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2617 times.
2617 if (bytes_out)
102 *bytes_out = bytes_transferred;
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 2617 capy::executor_ref saved_ex( std::move( ex ) );
106 2617 capy::coro saved_h( std::move( h ) );
107 2617 auto prevent_premature_destruction = std::move(impl_ptr);
108
1/1
✓ Branch 1 taken 2617 times.
2617 resume_coro(saved_ex, saved_h);
109 2617 }
110
111 5243 epoll_socket_impl::
112 5243 epoll_socket_impl(epoll_socket_service& svc) noexcept
113 5243 : svc_(svc)
114 {
115 5243 }
116
117 5243 epoll_socket_impl::
118 ~epoll_socket_impl() = default;
119
120 void
121 5243 epoll_socket_impl::
122 release()
123 {
124 5243 close_socket();
125 5243 svc_.destroy_impl(*this);
126 5243 }
127
128 std::coroutine_handle<>
129 2617 epoll_socket_impl::
130 connect(
131 std::coroutine_handle<> h,
132 capy::executor_ref ex,
133 endpoint ep,
134 std::stop_token token,
135 std::error_code* ec)
136 {
137 2617 auto& op = conn_;
138 2617 op.reset();
139 2617 op.h = h;
140 2617 op.ex = ex;
141 2617 op.ec_out = ec;
142 2617 op.fd = fd_;
143 2617 op.target_endpoint = ep; // Store target for endpoint caching
144 2617 op.start(token, this);
145
146 2617 sockaddr_in addr = detail::to_sockaddr_in(ep);
147
1/1
✓ Branch 1 taken 2617 times.
2617 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2617 times.
2617 if (result == 0)
150 {
151 // Sync success - cache endpoints immediately
152 // Remote is always known; local may fail but we still cache remote
153 sockaddr_in local_addr{};
154 socklen_t local_len = sizeof(local_addr);
155 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 remote_endpoint_ = ep;
158
159 op.complete(0, 0);
160 op.impl_ptr = shared_from_this();
161 svc_.post(&op);
162 // completion is always posted to scheduler queue, never inline.
163 return std::noop_coroutine();
164 }
165
166
1/2
✓ Branch 0 taken 2617 times.
✗ Branch 1 not taken.
2617 if (errno == EINPROGRESS)
167 {
168 2617 svc_.work_started();
169
1/1
✓ Branch 1 taken 2617 times.
2617 op.impl_ptr = shared_from_this();
170
171 2617 bool perform_now = false;
172 {
173
1/1
✓ Branch 1 taken 2617 times.
2617 std::lock_guard lock(desc_state_.mutex);
174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2617 times.
2617 if (desc_state_.write_ready)
175 {
176 desc_state_.write_ready = false;
177 perform_now = true;
178 }
179 else
180 {
181 2617 desc_state_.connect_op = &op;
182 }
183 2617 }
184
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2617 times.
2617 if (perform_now)
186 {
187 op.perform_io();
188 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
189 {
190 op.errn = 0;
191 std::lock_guard lock(desc_state_.mutex);
192 desc_state_.connect_op = &op;
193 }
194 else
195 {
196 svc_.post(&op);
197 svc_.work_finished();
198 }
199 return std::noop_coroutine();
200 }
201
202
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2617 times.
2617 if (op.cancelled.load(std::memory_order_acquire))
203 {
204 epoll_op* claimed = nullptr;
205 {
206 std::lock_guard lock(desc_state_.mutex);
207 if (desc_state_.connect_op == &op)
208 claimed = std::exchange(desc_state_.connect_op, nullptr);
209 }
210 if (claimed)
211 {
212 svc_.post(claimed);
213 svc_.work_finished();
214 }
215 }
216 // completion is always posted to scheduler queue, never inline.
217 2617 return std::noop_coroutine();
218 }
219
220 op.complete(errno, 0);
221 op.impl_ptr = shared_from_this();
222 svc_.post(&op);
223 // completion is always posted to scheduler queue, never inline.
224 return std::noop_coroutine();
225 }
226
227 void
228 167 epoll_socket_impl::
229 do_read_io()
230 {
231 167 auto& op = rd_;
232
233 ssize_t n;
234 do {
235 167 n = ::readv(fd_, op.iovecs, op.iovec_count);
236
2/4
✓ Branch 0 taken 167 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 167 times.
167 } while (n < 0 && errno == EINTR);
237
238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 167 times.
167 if (n > 0)
239 {
240 {
241 std::lock_guard lock(desc_state_.mutex);
242 desc_state_.read_ready = false;
243 }
244 op.complete(0, static_cast<std::size_t>(n));
245 svc_.post(&op);
246 return;
247 }
248
249
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 167 times.
167 if (n == 0)
250 {
251 {
252 std::lock_guard lock(desc_state_.mutex);
253 desc_state_.read_ready = false;
254 }
255 op.complete(0, 0);
256 svc_.post(&op);
257 return;
258 }
259
260
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 167 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
167 if (errno == EAGAIN || errno == EWOULDBLOCK)
261 {
262 167 svc_.work_started();
263
264 167 bool perform_now = false;
265 {
266
1/1
✓ Branch 1 taken 167 times.
167 std::lock_guard lock(desc_state_.mutex);
267
2/2
✓ Branch 0 taken 76 times.
✓ Branch 1 taken 91 times.
167 if (desc_state_.read_ready)
268 {
269 76 desc_state_.read_ready = false;
270 76 perform_now = true;
271 }
272 else
273 {
274 91 desc_state_.read_op = &op;
275 }
276 167 }
277
278
2/2
✓ Branch 0 taken 76 times.
✓ Branch 1 taken 91 times.
167 if (perform_now)
279 {
280 76 op.perform_io();
281
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 76 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
76 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
282 {
283 76 op.errn = 0;
284
1/1
✓ Branch 1 taken 76 times.
76 std::lock_guard lock(desc_state_.mutex);
285 76 desc_state_.read_op = &op;
286 76 }
287 else
288 {
289 svc_.post(&op);
290 svc_.work_finished();
291 }
292 76 return;
293 }
294
295
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 91 times.
91 if (op.cancelled.load(std::memory_order_acquire))
296 {
297 epoll_op* claimed = nullptr;
298 {
299 std::lock_guard lock(desc_state_.mutex);
300 if (desc_state_.read_op == &op)
301 claimed = std::exchange(desc_state_.read_op, nullptr);
302 }
303 if (claimed)
304 {
305 svc_.post(claimed);
306 svc_.work_finished();
307 }
308 }
309 91 return;
310 }
311
312 op.complete(errno, 0);
313 svc_.post(&op);
314 }
315
316 void
317 epoll_socket_impl::
318 do_write_io()
319 {
320 auto& op = wr_;
321
322 msghdr msg{};
323 msg.msg_iov = op.iovecs;
324 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
325
326 ssize_t n;
327 do {
328 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
329 } while (n < 0 && errno == EINTR);
330
331 if (n > 0)
332 {
333 {
334 std::lock_guard lock(desc_state_.mutex);
335 desc_state_.write_ready = false;
336 }
337 op.complete(0, static_cast<std::size_t>(n));
338 svc_.post(&op);
339 return;
340 }
341
342 if (errno == EAGAIN || errno == EWOULDBLOCK)
343 {
344 svc_.work_started();
345
346 bool perform_now = false;
347 {
348 std::lock_guard lock(desc_state_.mutex);
349 if (desc_state_.write_ready)
350 {
351 desc_state_.write_ready = false;
352 perform_now = true;
353 }
354 else
355 {
356 desc_state_.write_op = &op;
357 }
358 }
359
360 if (perform_now)
361 {
362 op.perform_io();
363 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
364 {
365 op.errn = 0;
366 std::lock_guard lock(desc_state_.mutex);
367 desc_state_.write_op = &op;
368 }
369 else
370 {
371 svc_.post(&op);
372 svc_.work_finished();
373 }
374 return;
375 }
376
377 if (op.cancelled.load(std::memory_order_acquire))
378 {
379 epoll_op* claimed = nullptr;
380 {
381 std::lock_guard lock(desc_state_.mutex);
382 if (desc_state_.write_op == &op)
383 claimed = std::exchange(desc_state_.write_op, nullptr);
384 }
385 if (claimed)
386 {
387 svc_.post(claimed);
388 svc_.work_finished();
389 }
390 }
391 return;
392 }
393
394 op.complete(errno ? errno : EIO, 0);
395 svc_.post(&op);
396 }
397
398 std::coroutine_handle<>
399 74020 epoll_socket_impl::
400 read_some(
401 std::coroutine_handle<> h,
402 capy::executor_ref ex,
403 io_buffer_param param,
404 std::stop_token token,
405 std::error_code* ec,
406 std::size_t* bytes_out)
407 {
408 74020 auto& op = rd_;
409 74020 op.reset();
410 74020 op.h = h;
411 74020 op.ex = ex;
412 74020 op.ec_out = ec;
413 74020 op.bytes_out = bytes_out;
414 74020 op.fd = fd_;
415 74020 op.start(token, this);
416
1/1
✓ Branch 1 taken 74020 times.
74020 op.impl_ptr = shared_from_this();
417
418 // Must prepare buffers before initiator runs
419 74020 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
420 74020 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
421
422
6/8
✓ Branch 0 taken 74019 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 74019 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 74019 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 74019 times.
74020 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
423 {
424 1 op.empty_buffer_read = true;
425 1 op.complete(0, 0);
426
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
427 1 return std::noop_coroutine();
428 }
429
430
2/2
✓ Branch 0 taken 74019 times.
✓ Branch 1 taken 74019 times.
148038 for (int i = 0; i < op.iovec_count; ++i)
431 {
432 74019 op.iovecs[i].iov_base = bufs[i].data();
433 74019 op.iovecs[i].iov_len = bufs[i].size();
434 }
435
436 // Speculative read: bypass initiator when data is ready
437 ssize_t n;
438 do {
439
1/1
✓ Branch 1 taken 74019 times.
74019 n = ::readv(fd_, op.iovecs, op.iovec_count);
440
3/4
✓ Branch 0 taken 167 times.
✓ Branch 1 taken 73852 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 167 times.
74019 } while (n < 0 && errno == EINTR);
441
442
2/2
✓ Branch 0 taken 73847 times.
✓ Branch 1 taken 172 times.
74019 if (n > 0)
443 {
444 73847 op.complete(0, static_cast<std::size_t>(n));
445
1/1
✓ Branch 1 taken 73847 times.
73847 svc_.post(&op);
446 73847 return std::noop_coroutine();
447 }
448
449
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 167 times.
172 if (n == 0)
450 {
451 5 op.complete(0, 0);
452
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
453 5 return std::noop_coroutine();
454 }
455
456
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 167 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
167 if (errno != EAGAIN && errno != EWOULDBLOCK)
457 {
458 op.complete(errno, 0);
459 svc_.post(&op);
460 return std::noop_coroutine();
461 }
462
463 // EAGAIN — full async path
464
1/1
✓ Branch 1 taken 167 times.
167 return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
465 }
466
467 std::coroutine_handle<>
468 73900 epoll_socket_impl::
469 write_some(
470 std::coroutine_handle<> h,
471 capy::executor_ref ex,
472 io_buffer_param param,
473 std::stop_token token,
474 std::error_code* ec,
475 std::size_t* bytes_out)
476 {
477 73900 auto& op = wr_;
478 73900 op.reset();
479 73900 op.h = h;
480 73900 op.ex = ex;
481 73900 op.ec_out = ec;
482 73900 op.bytes_out = bytes_out;
483 73900 op.fd = fd_;
484 73900 op.start(token, this);
485
1/1
✓ Branch 1 taken 73900 times.
73900 op.impl_ptr = shared_from_this();
486
487 73900 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
488 73900 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
489
490
6/8
✓ Branch 0 taken 73899 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 73899 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 73899 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 73899 times.
73900 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
491 {
492 1 op.complete(0, 0);
493
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
494 1 return std::noop_coroutine();
495 }
496
497
2/2
✓ Branch 0 taken 73899 times.
✓ Branch 1 taken 73899 times.
147798 for (int i = 0; i < op.iovec_count; ++i)
498 {
499 73899 op.iovecs[i].iov_base = bufs[i].data();
500 73899 op.iovecs[i].iov_len = bufs[i].size();
501 }
502
503 // Speculative write: bypass initiator when buffer space is ready
504 73899 msghdr msg{};
505 73899 msg.msg_iov = op.iovecs;
506 73899 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
507
508 ssize_t n;
509 do {
510
1/1
✓ Branch 1 taken 73899 times.
73899 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
511
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 73898 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
73899 } while (n < 0 && errno == EINTR);
512
513
2/2
✓ Branch 0 taken 73898 times.
✓ Branch 1 taken 1 time.
73899 if (n > 0)
514 {
515 73898 op.complete(0, static_cast<std::size_t>(n));
516
1/1
✓ Branch 1 taken 73898 times.
73898 svc_.post(&op);
517 73898 return std::noop_coroutine();
518 }
519
520
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (n == 0)
521 {
522 op.complete(0, 0);
523 svc_.post(&op);
524 return std::noop_coroutine();
525 }
526
527
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
1 if (errno != EAGAIN && errno != EWOULDBLOCK)
528 {
529 1 op.complete(errno, 0);
530
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
531 1 return std::noop_coroutine();
532 }
533
534 // EAGAIN — full async path
535 return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
536 }
537
538 std::error_code
539 3 epoll_socket_impl::
540 shutdown(tcp_socket::shutdown_type what) noexcept
541 {
542 int how;
543
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
544 {
545 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
546 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
547 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
548 default:
549 return make_err(EINVAL);
550 }
551
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
552 return make_err(errno);
553 3 return {};
554 }
555
556 std::error_code
557 5 epoll_socket_impl::
558 set_no_delay(bool value) noexcept
559 {
560
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
561
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
562 return make_err(errno);
563 5 return {};
564 }
565
566 bool
567 5 epoll_socket_impl::
568 no_delay(std::error_code& ec) const noexcept
569 {
570 5 int flag = 0;
571 5 socklen_t len = sizeof(flag);
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
573 {
574 ec = make_err(errno);
575 return false;
576 }
577 5 ec = {};
578 5 return flag != 0;
579 }
580
581 std::error_code
582 4 epoll_socket_impl::
583 set_keep_alive(bool value) noexcept
584 {
585
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
586
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
587 return make_err(errno);
588 4 return {};
589 }
590
591 bool
592 4 epoll_socket_impl::
593 keep_alive(std::error_code& ec) const noexcept
594 {
595 4 int flag = 0;
596 4 socklen_t len = sizeof(flag);
597
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
598 {
599 ec = make_err(errno);
600 return false;
601 }
602 4 ec = {};
603 4 return flag != 0;
604 }
605
606 std::error_code
607 1 epoll_socket_impl::
608 set_receive_buffer_size(int size) noexcept
609 {
610
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
611 return make_err(errno);
612 1 return {};
613 }
614
615 int
616 3 epoll_socket_impl::
617 receive_buffer_size(std::error_code& ec) const noexcept
618 {
619 3 int size = 0;
620 3 socklen_t len = sizeof(size);
621
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
622 {
623 ec = make_err(errno);
624 return 0;
625 }
626 3 ec = {};
627 3 return size;
628 }
629
630 std::error_code
631 1 epoll_socket_impl::
632 set_send_buffer_size(int size) noexcept
633 {
634
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
635 return make_err(errno);
636 1 return {};
637 }
638
639 int
640 3 epoll_socket_impl::
641 send_buffer_size(std::error_code& ec) const noexcept
642 {
643 3 int size = 0;
644 3 socklen_t len = sizeof(size);
645
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
646 {
647 ec = make_err(errno);
648 return 0;
649 }
650 3 ec = {};
651 3 return size;
652 }
653
654 std::error_code
655 8 epoll_socket_impl::
656 set_linger(bool enabled, int timeout) noexcept
657 {
658
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
659 1 return make_err(EINVAL);
660 struct ::linger lg;
661
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
662 7 lg.l_linger = timeout;
663
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
664 return make_err(errno);
665 7 return {};
666 }
667
668 tcp_socket::linger_options
669 3 epoll_socket_impl::
670 linger(std::error_code& ec) const noexcept
671 {
672 3 struct ::linger lg{};
673 3 socklen_t len = sizeof(lg);
674
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
675 {
676 ec = make_err(errno);
677 return {};
678 }
679 3 ec = {};
680 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
681 }
682
683 void
684 7965 epoll_socket_impl::
685 cancel() noexcept
686 {
687 7965 std::shared_ptr<epoll_socket_impl> self;
688 try {
689
1/1
✓ Branch 1 taken 7965 times.
7965 self = shared_from_this();
690 } catch (const std::bad_weak_ptr&) {
691 return;
692 }
693
694 7965 conn_.request_cancel();
695 7965 rd_.request_cancel();
696 7965 wr_.request_cancel();
697
698 7965 epoll_op* conn_claimed = nullptr;
699 7965 epoll_op* rd_claimed = nullptr;
700 7965 epoll_op* wr_claimed = nullptr;
701 {
702 7965 std::lock_guard lock(desc_state_.mutex);
703
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7965 times.
7965 if (desc_state_.connect_op == &conn_)
704 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
705
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 7915 times.
7965 if (desc_state_.read_op == &rd_)
706 50 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
707
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7965 times.
7965 if (desc_state_.write_op == &wr_)
708 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
709 7965 }
710
711
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7965 times.
7965 if (conn_claimed)
712 {
713 conn_.impl_ptr = self;
714 svc_.post(&conn_);
715 svc_.work_finished();
716 }
717
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 7915 times.
7965 if (rd_claimed)
718 {
719 50 rd_.impl_ptr = self;
720 50 svc_.post(&rd_);
721 50 svc_.work_finished();
722 }
723
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7965 times.
7965 if (wr_claimed)
724 {
725 wr_.impl_ptr = self;
726 svc_.post(&wr_);
727 svc_.work_finished();
728 }
729 7965 }
730
731 void
732 98 epoll_socket_impl::
733 cancel_single_op(epoll_op& op) noexcept
734 {
735 98 op.request_cancel();
736
737 98 epoll_op** desc_op_ptr = nullptr;
738
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
739
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
740 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
741
742
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
743 {
744 98 epoll_op* claimed = nullptr;
745 {
746 98 std::lock_guard lock(desc_state_.mutex);
747
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (*desc_op_ptr == &op)
748 66 claimed = std::exchange(*desc_op_ptr, nullptr);
749 98 }
750
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (claimed)
751 {
752 try {
753
1/1
✓ Branch 1 taken 66 times.
66 op.impl_ptr = shared_from_this();
754 } catch (const std::bad_weak_ptr&) {}
755 66 svc_.post(&op);
756 66 svc_.work_finished();
757 }
758 }
759 98 }
760
761 void
762 7871 epoll_socket_impl::
763 close_socket() noexcept
764 {
765 7871 cancel();
766
767 // Keep impl alive if descriptor_state is queued in the scheduler.
768 // Without this, destroy_impl() drops the last shared_ptr while
769 // the queued descriptor_state node would become dangling.
770
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 7867 times.
7871 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
771 {
772 try {
773
1/1
✓ Branch 1 taken 4 times.
4 desc_state_.impl_ref_ = shared_from_this();
774 } catch (std::bad_weak_ptr const&) {}
775 }
776
777
2/2
✓ Branch 0 taken 5243 times.
✓ Branch 1 taken 2628 times.
7871 if (fd_ >= 0)
778 {
779
1/2
✓ Branch 0 taken 5243 times.
✗ Branch 1 not taken.
5243 if (desc_state_.registered_events != 0)
780 5243 svc_.scheduler().deregister_descriptor(fd_);
781 5243 ::close(fd_);
782 5243 fd_ = -1;
783 }
784
785 7871 desc_state_.fd = -1;
786 {
787 7871 std::lock_guard lock(desc_state_.mutex);
788 7871 desc_state_.read_op = nullptr;
789 7871 desc_state_.write_op = nullptr;
790 7871 desc_state_.connect_op = nullptr;
791 7871 desc_state_.read_ready = false;
792 7871 desc_state_.write_ready = false;
793 7871 }
794 7871 desc_state_.registered_events = 0;
795
796 7871 local_endpoint_ = endpoint{};
797 7871 remote_endpoint_ = endpoint{};
798 7871 }
799
800 189 epoll_socket_service::
801 189 epoll_socket_service(capy::execution_context& ctx)
802
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
803 {
804 189 }
805
806 378 epoll_socket_service::
807 189 ~epoll_socket_service()
808 {
809 378 }
810
811 void
812 189 epoll_socket_service::
813 shutdown()
814 {
815
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
816
817
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
818 impl->close_socket();
819
820 189 state_->socket_ptrs_.clear();
821 189 }
822
823 tcp_socket::socket_impl&
824 5243 epoll_socket_service::
825 create_impl()
826 {
827
1/1
✓ Branch 1 taken 5243 times.
5243 auto impl = std::make_shared<epoll_socket_impl>(*this);
828 5243 auto* raw = impl.get();
829
830 {
831
1/1
✓ Branch 2 taken 5243 times.
5243 std::lock_guard lock(state_->mutex_);
832 5243 state_->socket_list_.push_back(raw);
833
1/1
✓ Branch 3 taken 5243 times.
5243 state_->socket_ptrs_.emplace(raw, std::move(impl));
834 5243 }
835
836 5243 return *raw;
837 5243 }
838
839 void
840 5243 epoll_socket_service::
841 destroy_impl(tcp_socket::socket_impl& impl)
842 {
843 5243 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
844
1/1
✓ Branch 2 taken 5243 times.
5243 std::lock_guard lock(state_->mutex_);
845 5243 state_->socket_list_.remove(epoll_impl);
846
1/1
✓ Branch 2 taken 5243 times.
5243 state_->socket_ptrs_.erase(epoll_impl);
847 5243 }
848
849 std::error_code
850 2628 epoll_socket_service::
851 open_socket(tcp_socket::socket_impl& impl)
852 {
853 2628 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
854 2628 epoll_impl->close_socket();
855
856 2628 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
857
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2628 times.
2628 if (fd < 0)
858 return make_err(errno);
859
860 2628 epoll_impl->fd_ = fd;
861
862 // Register fd with epoll (edge-triggered mode)
863 2628 epoll_impl->desc_state_.fd = fd;
864 {
865
1/1
✓ Branch 1 taken 2628 times.
2628 std::lock_guard lock(epoll_impl->desc_state_.mutex);
866 2628 epoll_impl->desc_state_.read_op = nullptr;
867 2628 epoll_impl->desc_state_.write_op = nullptr;
868 2628 epoll_impl->desc_state_.connect_op = nullptr;
869 2628 }
870 2628 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
871
872 2628 return {};
873 }
874
875 void
876 147869 epoll_socket_service::
877 post(epoll_op* op)
878 {
879 147869 state_->sched_.post(op);
880 147869 }
881
882 void
883 2784 epoll_socket_service::
884 work_started() noexcept
885 {
886 2784 state_->sched_.work_started();
887 2784 }
888
889 void
890 116 epoll_socket_service::
891 work_finished() noexcept
892 {
893 116 state_->sched_.work_finished();
894 116 }
895
896 } // namespace boost::corosio::detail
897
898 #endif // BOOST_COROSIO_HAS_EPOLL
899