libs/corosio/src/corosio/src/detail/select/op.hpp

74.2% Lines (95/128) 84.2% Functions (16/19) 65.7% Branches (23/35)
libs/corosio/src/corosio/src/detail/select/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/scheduler_op.hpp"
27 #include "src/detail/endpoint_convert.hpp"
28
29 #include <unistd.h>
30 #include <errno.h>
31 #include <fcntl.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 select Operation State
46 ======================
47
48 Each async I/O operation has a corresponding select_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 This mirrors the epoll_op design for consistency across backends.
54
55 Completion vs Cancellation Race
56 -------------------------------
57 The `registered` atomic uses a tri-state (unregistered, registering,
58 registered) to handle two races: (1) between register_fd() and the
59 reactor seeing an event, and (2) between reactor completion and cancel().
60
61 The registering state closes the window where an event could arrive
62 after register_fd() but before the boolean was set. The reactor and
63 cancel() both treat registering the same as registered when claiming.
64
65 Whoever atomically exchanges to unregistered "claims" the operation
66 and is responsible for completing it. The loser sees unregistered and
67 does nothing. The initiating thread uses compare_exchange to transition
68 from registering to registered; if this fails, the reactor or cancel
69 already claimed the op.
70
71 Impl Lifetime Management
72 ------------------------
73 When cancel() posts an op to the scheduler's ready queue, the socket impl
74 might be destroyed before the scheduler processes the op. The `impl_ptr`
75 member holds a shared_ptr to the impl, keeping it alive until the op
76 completes.
77
78 EOF Detection
79 -------------
80 For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82
83 SIGPIPE Prevention
84 ------------------
85 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 SIGPIPE when the peer has closed.
87 */
88
89 namespace boost::corosio::detail {
90
91 // Forward declarations for cancellation support
92 class select_socket_impl;
93 class select_acceptor_impl;
94
95 /** Registration state for async operations.
96
97 Tri-state enum to handle the race between register_fd() and
98 run_reactor() seeing an event. Setting REGISTERING before
99 calling register_fd() ensures events delivered during the
100 registration window are not dropped.
101 */
102 enum class select_registration_state : std::uint8_t
103 {
104 unregistered, ///< Not registered with reactor
105 registering, ///< register_fd() called, not yet confirmed
106 registered ///< Fully registered, ready for events
107 };
108
109 struct select_op : scheduler_op
110 {
111 struct canceller
112 {
113 select_op* op;
114 void operator()() const noexcept;
115 };
116
117 capy::coro h;
118 capy::executor_ref ex;
119 std::error_code* ec_out = nullptr;
120 std::size_t* bytes_out = nullptr;
121
122 int fd = -1;
123 int errn = 0;
124 std::size_t bytes_transferred = 0;
125
126 std::atomic<bool> cancelled{false};
127 std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 std::optional<std::stop_callback<canceller>> stop_cb;
129
130 // Prevents use-after-free when socket is closed with pending ops.
131 std::shared_ptr<void> impl_ptr;
132
133 // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 select_socket_impl* socket_impl_ = nullptr;
135 select_acceptor_impl* acceptor_impl_ = nullptr;
136
137 12165 select_op() = default;
138
139 166381 void reset() noexcept
140 {
141 166381 fd = -1;
142 166381 errn = 0;
143 166381 bytes_transferred = 0;
144 166381 cancelled.store(false, std::memory_order_relaxed);
145 166381 registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
146 166381 impl_ptr.reset();
147 166381 socket_impl_ = nullptr;
148 166381 acceptor_impl_ = nullptr;
149 166381 }
150
151 162348 void operator()() override
152 {
153 162348 stop_cb.reset();
154
155
1/2
✓ Branch 0 taken 162348 times.
✗ Branch 1 not taken.
162348 if (ec_out)
156 {
157
2/2
✓ Branch 1 taken 198 times.
✓ Branch 2 taken 162150 times.
162348 if (cancelled.load(std::memory_order_acquire))
158 198 *ec_out = capy::error::canceled;
159
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 162149 times.
162150 else if (errn != 0)
160 1 *ec_out = make_err(errn);
161
6/6
✓ Branch 1 taken 81038 times.
✓ Branch 2 taken 81111 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 81033 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 162144 times.
162149 else if (is_read_operation() && bytes_transferred == 0)
162 5 *ec_out = capy::error::eof;
163 else
164 162144 *ec_out = {};
165 }
166
167
1/2
✓ Branch 0 taken 162348 times.
✗ Branch 1 not taken.
162348 if (bytes_out)
168 162348 *bytes_out = bytes_transferred;
169
170 // Move to stack before destroying the frame
171 162348 capy::executor_ref saved_ex( std::move( ex ) );
172 162348 capy::coro saved_h( std::move( h ) );
173 162348 impl_ptr.reset();
174
1/1
✓ Branch 1 taken 162348 times.
162348 saved_ex.dispatch( saved_h );
175 162348 }
176
177 81110 virtual bool is_read_operation() const noexcept { return false; }
178 virtual void cancel() noexcept = 0;
179
180 void destroy() override
181 {
182 stop_cb.reset();
183 impl_ptr.reset();
184 }
185
186 18662 void request_cancel() noexcept
187 {
188 18662 cancelled.store(true, std::memory_order_release);
189 18662 }
190
191 void start(std::stop_token token)
192 {
193 cancelled.store(false, std::memory_order_release);
194 stop_cb.reset();
195 socket_impl_ = nullptr;
196 acceptor_impl_ = nullptr;
197
198 if (token.stop_possible())
199 stop_cb.emplace(token, canceller{this});
200 }
201
202 164364 void start(std::stop_token token, select_socket_impl* impl)
203 {
204 164364 cancelled.store(false, std::memory_order_release);
205 164364 stop_cb.reset();
206 164364 socket_impl_ = impl;
207 164364 acceptor_impl_ = nullptr;
208
209
2/2
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 164266 times.
164364 if (token.stop_possible())
210 98 stop_cb.emplace(token, canceller{this});
211 164364 }
212
213 2017 void start(std::stop_token token, select_acceptor_impl* impl)
214 {
215 2017 cancelled.store(false, std::memory_order_release);
216 2017 stop_cb.reset();
217 2017 socket_impl_ = nullptr;
218 2017 acceptor_impl_ = impl;
219
220
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2017 times.
2017 if (token.stop_possible())
221 stop_cb.emplace(token, canceller{this});
222 2017 }
223
224 166264 void complete(int err, std::size_t bytes) noexcept
225 {
226 166264 errn = err;
227 166264 bytes_transferred = bytes;
228 166264 }
229
230 virtual void perform_io() noexcept {}
231 };
232
233
234 struct select_connect_op : select_op
235 {
236 endpoint target_endpoint;
237
238 2016 void reset() noexcept
239 {
240 2016 select_op::reset();
241 2016 target_endpoint = endpoint{};
242 2016 }
243
244 2016 void perform_io() noexcept override
245 {
246 // connect() completion status is retrieved via SO_ERROR, not return value
247 2016 int err = 0;
248 2016 socklen_t len = sizeof(err);
249
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2016 times.
2016 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
250 err = errno;
251 2016 complete(err, 0);
252 2016 }
253
254 // Defined in sockets.cpp where select_socket_impl is complete
255 void operator()() override;
256 void cancel() noexcept override;
257 };
258
259
260 struct select_read_op : select_op
261 {
262 static constexpr std::size_t max_buffers = 16;
263 iovec iovecs[max_buffers];
264 int iovec_count = 0;
265 bool empty_buffer_read = false;
266
267 81039 bool is_read_operation() const noexcept override
268 {
269 81039 return !empty_buffer_read;
270 }
271
272 81233 void reset() noexcept
273 {
274 81233 select_op::reset();
275 81233 iovec_count = 0;
276 81233 empty_buffer_read = false;
277 81233 }
278
279 79 void perform_io() noexcept override
280 {
281 79 ssize_t n = ::readv(fd, iovecs, iovec_count);
282
1/2
✓ Branch 0 taken 79 times.
✗ Branch 1 not taken.
79 if (n >= 0)
283 79 complete(0, static_cast<std::size_t>(n));
284 else
285 complete(errno, 0);
286 79 }
287
288 void cancel() noexcept override;
289 };
290
291
292 struct select_write_op : select_op
293 {
294 static constexpr std::size_t max_buffers = 16;
295 iovec iovecs[max_buffers];
296 int iovec_count = 0;
297
298 81115 void reset() noexcept
299 {
300 81115 select_op::reset();
301 81115 iovec_count = 0;
302 81115 }
303
304 void perform_io() noexcept override
305 {
306 msghdr msg{};
307 msg.msg_iov = iovecs;
308 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
309
310 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
311 if (n >= 0)
312 complete(0, static_cast<std::size_t>(n));
313 else
314 complete(errno, 0);
315 }
316
317 void cancel() noexcept override;
318 };
319
320
321 struct select_accept_op : select_op
322 {
323 int accepted_fd = -1;
324 io_object::io_object_impl* peer_impl = nullptr;
325 io_object::io_object_impl** impl_out = nullptr;
326
327 2017 void reset() noexcept
328 {
329 2017 select_op::reset();
330 2017 accepted_fd = -1;
331 2017 peer_impl = nullptr;
332 2017 impl_out = nullptr;
333 2017 }
334
335 2012 void perform_io() noexcept override
336 {
337 2012 sockaddr_in addr{};
338 2012 socklen_t addrlen = sizeof(addr);
339
340 // Note: select backend uses accept() + fcntl instead of accept4()
341 // for broader POSIX compatibility
342 2012 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
343
344
1/2
✓ Branch 0 taken 2012 times.
✗ Branch 1 not taken.
2012 if (new_fd >= 0)
345 {
346 // Reject fds that exceed select()'s FD_SETSIZE limit.
347 // Better to fail now than during later async operations.
348
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2012 times.
2012 if (new_fd >= FD_SETSIZE)
349 {
350 ::close(new_fd);
351 complete(EINVAL, 0);
352 return;
353 }
354
355 // Set non-blocking and close-on-exec flags.
356 // A non-blocking socket is essential for the async reactor;
357 // if we can't configure it, fail rather than risk blocking.
358 2012 int flags = ::fcntl(new_fd, F_GETFL, 0);
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2012 times.
2012 if (flags == -1)
360 {
361 int err = errno;
362 ::close(new_fd);
363 complete(err, 0);
364 return;
365 }
366
367
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2012 times.
2012 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
368 {
369 int err = errno;
370 ::close(new_fd);
371 complete(err, 0);
372 return;
373 }
374
375
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2012 times.
2012 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
376 {
377 int err = errno;
378 ::close(new_fd);
379 complete(err, 0);
380 return;
381 }
382
383 2012 accepted_fd = new_fd;
384 2012 complete(0, 0);
385 }
386 else
387 {
388 complete(errno, 0);
389 }
390 }
391
392 // Defined in acceptors.cpp where select_acceptor_impl is complete
393 void operator()() override;
394 void cancel() noexcept override;
395 };
396
397 } // namespace boost::corosio::detail
398
399 #endif // BOOST_COROSIO_HAS_SELECT
400
401 #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
402