100.00% Lines (28/28) 100.00% Functions (13/13)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_EX_STRAND_HPP 10   #ifndef BOOST_CAPY_EX_STRAND_HPP
11   #define BOOST_CAPY_EX_STRAND_HPP 11   #define BOOST_CAPY_EX_STRAND_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/continuation.hpp> 14   #include <boost/capy/continuation.hpp>
15   #include <coroutine> 15   #include <coroutine>
16   #include <boost/capy/ex/detail/strand_service.hpp> 16   #include <boost/capy/ex/detail/strand_service.hpp>
17   17  
18   #include <type_traits> 18   #include <type_traits>
19   19  
20   namespace boost { 20   namespace boost {
21   namespace capy { 21   namespace capy {
22   22  
23   /** Provides serialized coroutine execution for any executor type. 23   /** Provides serialized coroutine execution for any executor type.
24   24  
25   A strand wraps an inner executor and ensures that coroutines 25   A strand wraps an inner executor and ensures that coroutines
26   dispatched through it never run concurrently. At most one 26   dispatched through it never run concurrently. At most one
27   coroutine executes at a time within a strand, even when the 27   coroutine executes at a time within a strand, even when the
28   underlying executor runs on multiple threads. 28   underlying executor runs on multiple threads.
29   29  
30   Strands are lightweight handles that can be copied freely. 30   Strands are lightweight handles that can be copied freely.
31   Copies share the same internal serialization state, so 31   Copies share the same internal serialization state, so
32   coroutines dispatched through any copy are serialized with 32   coroutines dispatched through any copy are serialized with
33   respect to all other copies. 33   respect to all other copies.
34   34  
35   @par Invariant 35   @par Invariant
36   Coroutines resumed through a strand shall not run concurrently. 36   Coroutines resumed through a strand shall not run concurrently.
37   37  
38   @par Implementation 38   @par Implementation
39   Each strand allocates a private serialization state. Strands 39   Each strand allocates a private serialization state. Strands
40   constructed from the same execution context share a small pool 40   constructed from the same execution context share a small pool
41   of mutexes (193 entries) selected by hash; mutex sharing causes 41   of mutexes (193 entries) selected by hash; mutex sharing causes
42   only brief contention on the push/pop critical section, never 42   only brief contention on the push/pop critical section, never
43   cross-strand state sharing. Construction cost: one 43   cross-strand state sharing. Construction cost: one
44   `std::make_shared` per strand. 44   `std::make_shared` per strand.
45   45  
46   @par Executor Concept 46   @par Executor Concept
47   This class satisfies the `Executor` concept, providing: 47   This class satisfies the `Executor` concept, providing:
48   - `context()` - Returns the underlying execution context 48   - `context()` - Returns the underlying execution context
49   - `on_work_started()` / `on_work_finished()` - Work tracking 49   - `on_work_started()` / `on_work_finished()` - Work tracking
50   - `dispatch(continuation&)` - May run immediately if strand is idle 50   - `dispatch(continuation&)` - May run immediately if strand is idle
51   - `post(continuation&)` - Always queues for later execution 51   - `post(continuation&)` - Always queues for later execution
52   52  
53   @par Thread Safety 53   @par Thread Safety
54   Distinct objects: Safe. 54   Distinct objects: Safe.
55   Shared objects: Safe. 55   Shared objects: Safe.
56   56  
57   @par Example 57   @par Example
58   @code 58   @code
59   thread_pool pool(4); 59   thread_pool pool(4);
60   auto strand = make_strand(pool.get_executor()); 60   auto strand = make_strand(pool.get_executor());
61   61  
62   // Continuations are linked intrusively into the strand's queue, 62   // Continuations are linked intrusively into the strand's queue,
63   // so each one must outlive its time there. Storage is typically 63   // so each one must outlive its time there. Storage is typically
64   // owned by the awaitable or operation state that posted it. 64   // owned by the awaitable or operation state that posted it.
65   continuation c1{h1}, c2{h2}, c3{h3}; 65   continuation c1{h1}, c2{h2}, c3{h3};
66   strand.post(c1); 66   strand.post(c1);
67   strand.post(c2); 67   strand.post(c2);
68   strand.post(c3); 68   strand.post(c3);
69   @endcode 69   @endcode
70   70  
71   @tparam E The type of the underlying executor. Must 71   @tparam E The type of the underlying executor. Must
72   satisfy the `Executor` concept. 72   satisfy the `Executor` concept.
73   73  
74   @see make_strand, Executor 74   @see make_strand, Executor
75   */ 75   */
76   template<typename Ex> 76   template<typename Ex>
77   class strand 77   class strand
78   { 78   {
79   std::shared_ptr<detail::strand_impl> impl_; 79   std::shared_ptr<detail::strand_impl> impl_;
80   Ex ex_; 80   Ex ex_;
81   81  
82   friend struct strand_test; 82   friend struct strand_test;
83   83  
84   public: 84   public:
85   /** The type of the underlying executor. 85   /** The type of the underlying executor.
86   */ 86   */
87   using inner_executor_type = Ex; 87   using inner_executor_type = Ex;
88   88  
89   /** Construct a strand for the specified executor. 89   /** Construct a strand for the specified executor.
90   90  
91   Allocates a fresh strand implementation from the service 91   Allocates a fresh strand implementation from the service
92   associated with the executor's context. 92   associated with the executor's context.
93   93  
94   @param ex The inner executor to wrap. Coroutines will 94   @param ex The inner executor to wrap. Coroutines will
95   ultimately be dispatched through this executor. 95   ultimately be dispatched through this executor.
96   96  
97   @note This constructor is disabled if the argument is a 97   @note This constructor is disabled if the argument is a
98   strand type, to prevent strand-of-strand wrapping. 98   strand type, to prevent strand-of-strand wrapping.
99   */ 99   */
100   template<typename Ex1, 100   template<typename Ex1,
101   typename = std::enable_if_t< 101   typename = std::enable_if_t<
102   !std::is_same_v<std::decay_t<Ex1>, strand> && 102   !std::is_same_v<std::decay_t<Ex1>, strand> &&
103   !detail::is_strand<std::decay_t<Ex1>>::value && 103   !detail::is_strand<std::decay_t<Ex1>>::value &&
104   std::is_convertible_v<Ex1, Ex>>> 104   std::is_convertible_v<Ex1, Ex>>>
105   explicit 105   explicit
HITCBC 106   11442 strand(Ex1&& ex) 106   11442 strand(Ex1&& ex)
HITCBC 107   11442 : impl_(detail::get_strand_service(ex.context()) 107   11442 : impl_(detail::get_strand_service(ex.context())
HITCBC 108   11442 .create_implementation()) 108   11442 .create_implementation())
HITCBC 109   11442 , ex_(std::forward<Ex1>(ex)) 109   11442 , ex_(std::forward<Ex1>(ex))
110   { 110   {
HITCBC 111   11442 } 111   11442 }
112   112  
113   /** Construct a copy. 113   /** Construct a copy.
114   114  
115   Creates a strand that shares serialization state with 115   Creates a strand that shares serialization state with
116   the original. Coroutines dispatched through either strand 116   the original. Coroutines dispatched through either strand
117   will be serialized with respect to each other. 117   will be serialized with respect to each other.
118   */ 118   */
HITCBC 119   9 strand(strand const&) = default; 119   9 strand(strand const&) = default;
120   120  
121   /** Construct by moving. 121   /** Construct by moving.
122   122  
123   @note A moved-from strand is only safe to destroy 123   @note A moved-from strand is only safe to destroy
124   or reassign. 124   or reassign.
125   */ 125   */
HITCBC 126   11443 strand(strand&&) = default; 126   11443 strand(strand&&) = default;
127   127  
128   /** Assign by copying. 128   /** Assign by copying.
129   */ 129   */
HITCBC 130   1 strand& operator=(strand const&) = default; 130   1 strand& operator=(strand const&) = default;
131   131  
132   /** Assign by moving. 132   /** Assign by moving.
133   133  
134   @note A moved-from strand is only safe to destroy 134   @note A moved-from strand is only safe to destroy
135   or reassign. 135   or reassign.
136   */ 136   */
HITCBC 137   1 strand& operator=(strand&&) = default; 137   1 strand& operator=(strand&&) = default;
138   138  
139   /** Return the underlying executor. 139   /** Return the underlying executor.
140   140  
141   @return A const reference to the inner executor. 141   @return A const reference to the inner executor.
142   */ 142   */
143   Ex const& 143   Ex const&
HITCBC 144   1 get_inner_executor() const noexcept 144   1 get_inner_executor() const noexcept
145   { 145   {
HITCBC 146   1 return ex_; 146   1 return ex_;
147   } 147   }
148   148  
149   /** Return the underlying execution context. 149   /** Return the underlying execution context.
150   150  
151   @return A reference to the execution context associated 151   @return A reference to the execution context associated
152   with the inner executor. 152   with the inner executor.
153   */ 153   */
154   auto& 154   auto&
HITCBC 155   5 context() const noexcept 155   5 context() const noexcept
156   { 156   {
HITCBC 157   5 return ex_.context(); 157   5 return ex_.context();
158   } 158   }
159   159  
160   /** Notify that work has started. 160   /** Notify that work has started.
161   161  
162   Delegates to the inner executor's `on_work_started()`. 162   Delegates to the inner executor's `on_work_started()`.
163   This is a no-op for most executor types. 163   This is a no-op for most executor types.
164   */ 164   */
165   void 165   void
HITCBC 166   6 on_work_started() const noexcept 166   6 on_work_started() const noexcept
167   { 167   {
HITCBC 168   6 ex_.on_work_started(); 168   6 ex_.on_work_started();
HITCBC 169   6 } 169   6 }
170   170  
171   /** Notify that work has finished. 171   /** Notify that work has finished.
172   172  
173   Delegates to the inner executor's `on_work_finished()`. 173   Delegates to the inner executor's `on_work_finished()`.
174   This is a no-op for most executor types. 174   This is a no-op for most executor types.
175   */ 175   */
176   void 176   void
HITCBC 177   6 on_work_finished() const noexcept 177   6 on_work_finished() const noexcept
178   { 178   {
HITCBC 179   6 ex_.on_work_finished(); 179   6 ex_.on_work_finished();
HITCBC 180   6 } 180   6 }
181   181  
182   /** Determine whether the strand is running in the current thread. 182   /** Determine whether the strand is running in the current thread.
183   183  
184   @return true if the current thread is executing a coroutine 184   @return true if the current thread is executing a coroutine
185   within this strand's dispatch loop. 185   within this strand's dispatch loop.
186   */ 186   */
187   bool 187   bool
HITCBC 188   4 running_in_this_thread() const noexcept 188   4 running_in_this_thread() const noexcept
189   { 189   {
HITCBC 190   4 return detail::strand_service::running_in_this_thread(*impl_); 190   4 return detail::strand_service::running_in_this_thread(*impl_);
191   } 191   }
192   192  
193   /** Compare two strands for equality. 193   /** Compare two strands for equality.
194   194  
195   Two strands are equal if they share the same internal 195   Two strands are equal if they share the same internal
196   serialization state. Equal strands serialize coroutines 196   serialization state. Equal strands serialize coroutines
197   with respect to each other. 197   with respect to each other.
198   198  
199   @param other The strand to compare against. 199   @param other The strand to compare against.
200   @return true if both strands share the same implementation. 200   @return true if both strands share the same implementation.
201   */ 201   */
202   bool 202   bool
HITCBC 203   499505 operator==(strand const& other) const noexcept 203   499505 operator==(strand const& other) const noexcept
204   { 204   {
HITCBC 205   499505 return impl_.get() == other.impl_.get(); 205   499505 return impl_.get() == other.impl_.get();
206   } 206   }
207   207  
208   /** Post a continuation to the strand. 208   /** Post a continuation to the strand.
209   209  
210   The continuation is always queued for execution, never resumed 210   The continuation is always queued for execution, never resumed
211   immediately. When the strand becomes available, queued 211   immediately. When the strand becomes available, queued
212   work executes in FIFO order on the underlying executor. 212   work executes in FIFO order on the underlying executor.
213   213  
214   @par Ordering 214   @par Ordering
215   Guarantees strict FIFO ordering relative to other post() calls. 215   Guarantees strict FIFO ordering relative to other post() calls.
216   Use this instead of dispatch() when ordering matters. 216   Use this instead of dispatch() when ordering matters.
217   217  
218   @param c The continuation to post. The caller retains 218   @param c The continuation to post. The caller retains
219   ownership; the continuation must remain valid until 219   ownership; the continuation must remain valid until
220   it is dequeued and resumed. 220   it is dequeued and resumed.
221   */ 221   */
222   void 222   void
HITCBC 223   30335 post(continuation& c) const 223   30335 post(continuation& c) const
224   { 224   {
HITCBC 225   30335 detail::strand_service::post(impl_, executor_ref(ex_), c); 225   30335 detail::strand_service::post(impl_, executor_ref(ex_), c);
HITCBC 226   30335 } 226   30335 }
227   227  
228   /** Dispatch a continuation through the strand. 228   /** Dispatch a continuation through the strand.
229   229  
230   Returns a handle for symmetric transfer. If the calling 230   Returns a handle for symmetric transfer. If the calling
231   thread is already executing within this strand, returns `c.h`. 231   thread is already executing within this strand, returns `c.h`.
232   Otherwise, the continuation is queued and 232   Otherwise, the continuation is queued and
233   `std::noop_coroutine()` is returned. 233   `std::noop_coroutine()` is returned.
234   234  
235   @par Ordering 235   @par Ordering
236   Callers requiring strict FIFO ordering should use post() 236   Callers requiring strict FIFO ordering should use post()
237   instead, which always queues the continuation. 237   instead, which always queues the continuation.
238   238  
239   @param c The continuation to dispatch. The caller retains 239   @param c The continuation to dispatch. The caller retains
240   ownership; the continuation must remain valid until 240   ownership; the continuation must remain valid until
241   it is dequeued and resumed. 241   it is dequeued and resumed.
242   242  
243   @return A handle for symmetric transfer or `std::noop_coroutine()`. 243   @return A handle for symmetric transfer or `std::noop_coroutine()`.
244   */ 244   */
245   std::coroutine_handle<> 245   std::coroutine_handle<>
HITCBC 246   8 dispatch(continuation& c) const 246   8 dispatch(continuation& c) const
247   { 247   {
HITCBC 248   8 return detail::strand_service::dispatch(impl_, executor_ref(ex_), c); 248   8 return detail::strand_service::dispatch(impl_, executor_ref(ex_), c);
249   } 249   }
250   }; 250   };
251   251  
252   // Deduction guide 252   // Deduction guide
253   template<typename Ex> 253   template<typename Ex>
254   strand(Ex) -> strand<Ex>; 254   strand(Ex) -> strand<Ex>;
255   255  
256   } // namespace capy 256   } // namespace capy
257   } // namespace boost 257   } // namespace boost
258   258  
259   #endif 259   #endif