qb  2.0.0.0
C++17 Actor Framework
qb Issue Watch Star Fork Follow @isndev
Loading...
Searching...
No Matches
spsc.h
Go to the documentation of this file.
1
25
26#ifndef QB_LOCKFREE_SPSC_H
27#define QB_LOCKFREE_SPSC_H
28#include <algorithm>
29#include <array>
30#include <atomic>
31#include <cstdint>
32#include <cstring>
33#include <memory>
35#include <qb/utility/nocopy.h>
36#include <qb/utility/prefix.h>
37#include <thread>
38
39namespace qb::lockfree::spsc {
40namespace internal {
50template <typename T>
51class ringbuffer : public nocopy {
52 typedef std::size_t size_t;
53 constexpr static const int padding_size =
54 QB_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
55 std::atomic<size_t> write_index_;
56 char padding1[padding_size]{}; /* force read_index and write_index to different cache
57 lines */
58 std::atomic<size_t> read_index_;
59
60protected:
65 : write_index_(0)
66 , read_index_(0) {}
67
75 static size_t
76 next_index(size_t arg, size_t const max_size) {
77 size_t ret = arg + 1;
78 while (unlikely(ret >= max_size))
79 ret -= max_size;
80 return ret;
81 }
82
91 static size_t
92 read_available(size_t write_index, size_t read_index, size_t const max_size) {
93 if (write_index >= read_index)
94 return write_index - read_index;
95
96 const size_t ret = write_index + max_size - read_index;
97 return ret;
98 }
99
108 static size_t
109 write_available(size_t write_index, size_t read_index, size_t const max_size) {
110 size_t ret = read_index - write_index - 1;
111 if (write_index >= read_index)
112 ret += max_size;
113 return ret;
114 }
115
122 [[nodiscard]] size_t
123 read_available(size_t const max_size) const {
124 size_t write_index = write_index_.load(std::memory_order_acquire);
125 const size_t read_index = read_index_.load(std::memory_order_relaxed);
126 return read_available(write_index, read_index, max_size);
127 }
128
135 [[nodiscard]] size_t
136 write_available(size_t const max_size) const {
137 size_t write_index = write_index_.load(std::memory_order_relaxed);
138 const size_t read_index = read_index_.load(std::memory_order_acquire);
139 return write_available(write_index, read_index, max_size);
140 }
141
150 bool
151 enqueue(T const &t, T *buffer, size_t const max_size) {
152 const size_t write_index = write_index_.load(
153 std::memory_order_relaxed); // only written from enqueue thread
154 const size_t next = next_index(write_index, max_size);
155
156 if (next == read_index_.load(std::memory_order_acquire))
157 return false; /* ringbuffer is full */
158
159 new (buffer + write_index) T(t); // copy-construct
160
161 write_index_.store(next, std::memory_order_release);
162
163 return true;
164 }
165
176 template <bool _All>
177 size_t
178 enqueue(const T *input_buffer, size_t input_count, T *internal_buffer,
179 size_t const max_size) {
180 const size_t write_index = write_index_.load(
181 std::memory_order_relaxed); // only written from push thread
182 const size_t read_index = read_index_.load(std::memory_order_acquire);
183 const size_t avail = write_available(write_index, read_index, max_size);
184
185 if constexpr (_All) {
186 if (avail < input_count)
187 return 0;
188 } else {
189 if (!avail)
190 return 0;
191 input_count = (std::min) (input_count, avail);
192 }
193
194 size_t new_write_index = write_index + input_count;
195
196 if (write_index + input_count > max_size) {
197 /* copy data in two sections */
198 const size_t count0 = max_size - write_index;
199 const size_t count1 = input_count - count0;
200
201 // std::uninitialized_copy(input_buffer, input_buffer + count0,
202 // internal_buffer + write_index);
203 // std::uninitialized_copy(input_buffer + count0, input_buffer + input_count,
204 // internal_buffer);
205 std::memcpy(internal_buffer + write_index, input_buffer, count0 * sizeof(T));
206 std::memcpy(internal_buffer, input_buffer + count0, count1 * sizeof(T));
207 new_write_index -= max_size;
208 } else {
209 // std::uninitialized_copy(input_buffer, input_buffer + input_count,
210 // internal_buffer + write_index);
211 std::memcpy(internal_buffer + write_index, input_buffer,
212 input_count * sizeof(T));
213
214 if (new_write_index == max_size)
215 new_write_index = 0;
216 }
217
218 write_index_.store(new_write_index, std::memory_order_release);
219 return input_count;
220 }
221
231 size_t
232 dequeue(T *output_buffer, size_t output_count, T *internal_buffer,
233 size_t const max_size) {
234 const size_t write_index = write_index_.load(std::memory_order_acquire);
235 const size_t read_index =
236 read_index_.load(std::memory_order_relaxed); // only written from pop thread
237
238 const size_t avail = read_available(write_index, read_index, max_size);
239
240 if (avail == 0)
241 return 0;
242
243 output_count = (std::min) (output_count, avail);
244
245 size_t new_read_index = read_index + output_count;
246
247 if (read_index + output_count > max_size) {
248 /* copy data in two sections */
249 const size_t count0 = max_size - read_index;
250 const size_t count1 = output_count - count0;
251
252 // std::uninitialized_copy(internal_buffer + read_index, internal_buffer +
253 // read_index + count0, output_buffer);
254 // std::uninitialized_copy(internal_buffer, internal_buffer + count1,
255 // output_buffer + count0);
256 std::memcpy(output_buffer, internal_buffer + read_index, count0 * sizeof(T));
257 std::memcpy(output_buffer + count0, internal_buffer, count1 * sizeof(T));
258
259 new_read_index -= max_size;
260 } else {
261 // std::uninitialized_copy(internal_buffer + read_index, internal_buffer +
262 // read_index + output_count, output_buffer);
263 std::memcpy(output_buffer, internal_buffer + read_index,
264 output_count * sizeof(T));
265
266 if (new_read_index == max_size)
267 new_read_index = 0;
268 }
269
270 read_index_.store(new_read_index, std::memory_order_release);
271 return output_count;
272 }
273
283 template <typename _Func>
284 size_t
285 consume_all(_Func const &functor, T *internal_buffer, size_t max_size) {
286 const size_t write_index = write_index_.load(std::memory_order_acquire);
287 const size_t read_index =
288 read_index_.load(std::memory_order_relaxed); // only written from pop thread
289
290 const size_t avail = read_available(write_index, read_index, max_size);
291
292 if (avail == 0)
293 return 0;
294
295 const size_t output_count = avail;
296
297 size_t new_read_index = read_index + output_count;
298
299 if (read_index + output_count > max_size) {
300 /* copy data in two sections */
301 const size_t count0 = max_size - read_index;
302 const size_t count1 = output_count - count0;
303
304 functor(internal_buffer + read_index, count0);
305 functor(internal_buffer, count1);
306
307 new_read_index -= max_size;
308 } else {
309 functor(internal_buffer + read_index, output_count);
310
311 if (new_read_index == max_size)
312 new_read_index = 0;
313 }
314
315 read_index_.store(new_read_index, std::memory_order_release);
316 return output_count;
317 }
318
325 const T &
326 front(const T *internal_buffer) const {
327 const size_t read_index =
328 read_index_.load(std::memory_order_relaxed); // only written from pop thread
329 return *(internal_buffer + read_index);
330 }
331
338 T &
339 front(T *internal_buffer) {
340 const size_t read_index =
341 read_index_.load(std::memory_order_relaxed); // only written from pop thread
342 return *(internal_buffer + read_index);
343 }
344
345public:
351 bool
353 return empty(write_index_.load(std::memory_order_relaxed),
354 read_index_.load(std::memory_order_relaxed));
355 }
356
357private:
365 bool
366 empty(size_t write_index, size_t read_index) {
367 return write_index == read_index;
368 }
369};
370
371} // namespace internal
372
382template <typename T, size_t _MaxSize>
384 typedef std::size_t size_t;
385 constexpr static size_t max_size = _MaxSize + 1;
386 std::array<T, max_size> array_;
387
388public:
395 inline bool
396 enqueue(T const &t) noexcept {
397 return internal::ringbuffer<T>::enqueue(t, array_.data(), max_size);
398 }
399
406 inline bool
407 dequeue(T *ret) noexcept {
408 return internal::ringbuffer<T>::dequeue(ret, 1, array_.data(), max_size);
409 }
410
419 template <bool _All = true>
420 inline size_t
421 enqueue(T const *t, size_t size) noexcept {
422 return internal::ringbuffer<T>::template enqueue<_All>(t, size, array_.data(),
423 max_size);
424 }
425
433 inline size_t
434 dequeue(T *ret, size_t size) noexcept {
435 return internal::ringbuffer<T>::dequeue(ret, size, array_.data(), max_size);
436 }
437
447 template <typename Func>
448 inline size_t
449 dequeue(Func const &func, T *ret, size_t size) noexcept {
450 const size_t nb_consume =
451 internal::ringbuffer<T>::dequeue(ret, size, array_.data(), max_size);
452 if (nb_consume)
453 func(ret, nb_consume);
454 return nb_consume;
455 }
456
464 template <typename Func>
465 inline size_t
466 consume_all(Func const &func) noexcept {
467 return internal::ringbuffer<T>::consume_all(func, array_.data(), max_size);
468 }
469};
470
479template <typename T>
480class ringbuffer<T, 0> : public internal::ringbuffer<T> {
481 typedef std::size_t size_t;
482 const size_t max_size_;
483 std::unique_ptr<T> array_;
484
485public:
491 explicit ringbuffer(size_t const max_size)
492 : max_size_(max_size + 1)
493 , array_(new T[max_size + 1]) {}
494
501 inline bool
502 enqueue(T const &t) noexcept {
503 return internal::ringbuffer<T>::enqueue(t, array_.get(), max_size_);
504 }
505
512 inline bool
513 dequeue(T *ret) noexcept {
514 return internal::ringbuffer<T>::dequeue(ret, 1, array_.get(), max_size_);
515 }
516
525 template <bool _All = true>
526 inline size_t
527 enqueue(T const *t, size_t size) noexcept {
528 return internal::ringbuffer<T>::template enqueue<_All>(t, size, array_.get(),
529 max_size_);
530 }
531
539 inline size_t
540 dequeue(T *ret, size_t size) noexcept {
541 return internal::ringbuffer<T>::dequeue(ret, size, array_.get(), max_size_);
542 }
543
553 template <typename Func>
554 inline size_t
555 dequeue(Func const &func, T *ret, size_t size) noexcept {
556 const size_t nb_consume =
557 internal::ringbuffer<T>::dequeue(ret, size, array_.get(), max_size_);
558 if (nb_consume)
559 func(ret, nb_consume);
560 return nb_consume;
561 }
562
570 template <typename Func>
571 inline size_t
572 consume_all(Func const &func) noexcept {
573 return internal::ringbuffer<T>::consume_all(func, array_.get(), max_size_);
574 }
575};
576
577} // namespace qb::lockfree::spsc
578
579#endif /* QB_LOCKFREE_SPSC_H */
Branch prediction hint utilities for performance optimization.
Base implementation of the Single-Producer Single-Consumer ringbuffer.
Definition spsc.h:51
bool enqueue(T const &t, T *buffer, size_t const max_size)
Enqueue a single element into the buffer.
Definition spsc.h:151
const T & front(const T *internal_buffer) const
Get a reference to the element at the read index (const version)
Definition spsc.h:326
static size_t read_available(size_t write_index, size_t read_index, size_t const max_size)
Calculate how many elements are available for reading.
Definition spsc.h:92
size_t consume_all(_Func const &functor, T *internal_buffer, size_t max_size)
Process all available elements in the buffer using a functor.
Definition spsc.h:285
static size_t write_available(size_t write_index, size_t read_index, size_t const max_size)
Calculate how many elements can be written.
Definition spsc.h:109
static size_t next_index(size_t arg, size_t const max_size)
Calculate the next index in the buffer with wrap-around handling.
Definition spsc.h:76
ringbuffer()
Default constructor initializing indices.
Definition spsc.h:64
size_t write_available(size_t const max_size) const
Get the number of slots available for writing.
Definition spsc.h:136
size_t read_available(size_t const max_size) const
Get the number of elements available for reading.
Definition spsc.h:123
bool empty()
Check if the buffer is empty.
Definition spsc.h:352
T & front(T *internal_buffer)
Get a reference to the element at the read index.
Definition spsc.h:339
size_t enqueue(const T *input_buffer, size_t input_count, T *internal_buffer, size_t const max_size)
Enqueue multiple elements into the buffer.
Definition spsc.h:178
size_t dequeue(T *output_buffer, size_t output_count, T *internal_buffer, size_t const max_size)
Dequeue multiple elements from the buffer.
Definition spsc.h:232
size_t consume_all(Func const &func) noexcept
Process all available elements in the buffer using a functor.
Definition spsc.h:572
bool enqueue(T const &t) noexcept
Enqueue a single element into the buffer.
Definition spsc.h:502
bool dequeue(T *ret) noexcept
Dequeue a single element from the buffer.
Definition spsc.h:513
size_t enqueue(T const *t, size_t size) noexcept
Enqueue multiple elements into the buffer.
Definition spsc.h:527
size_t dequeue(T *ret, size_t size) noexcept
Dequeue multiple elements from the buffer.
Definition spsc.h:540
ringbuffer(size_t const max_size)
Constructs a ringbuffer with the specified maximum size.
Definition spsc.h:491
size_t dequeue(Func const &func, T *ret, size_t size) noexcept
Dequeue multiple elements and process them with a functor.
Definition spsc.h:555
size_t consume_all(Func const &func) noexcept
Process all available elements in the buffer using a functor.
Definition spsc.h:466
size_t dequeue(Func const &func, T *ret, size_t size) noexcept
Dequeue multiple elements and process them with a functor.
Definition spsc.h:449
size_t dequeue(T *ret, size_t size) noexcept
Dequeue multiple elements from the buffer.
Definition spsc.h:434
bool enqueue(T const &t) noexcept
Enqueue a single element into the buffer.
Definition spsc.h:396
bool dequeue(T *ret) noexcept
Dequeue a single element from the buffer.
Definition spsc.h:407
size_t enqueue(T const *t, size_t size) noexcept
Enqueue multiple elements into the buffer.
Definition spsc.h:421
bool unlikely(bool expr)
Hint for branch prediction when a condition is expected to be false.
Definition branch_hints.h:76
Defines a base class to make derived classes non-copyable.
Platform-specific alignment macros, cache-line definitions, and related utilities.
nocopy()=default
Default constructor.