26#ifndef QB_LOCKFREE_SPSC_H
27#define QB_LOCKFREE_SPSC_H
39namespace qb::lockfree::spsc {
52 typedef std::size_t size_t;
53 constexpr static const int padding_size =
54 QB_LOCKFREE_CACHELINE_BYTES -
sizeof(size_t);
55 std::atomic<size_t> write_index_;
56 char padding1[padding_size]{};
58 std::atomic<size_t> read_index_;
92 read_available(
size_t write_index,
size_t read_index,
size_t const max_size) {
93 if (write_index >= read_index)
94 return write_index - read_index;
96 const size_t ret = write_index + max_size - read_index;
110 size_t ret = read_index - write_index - 1;
111 if (write_index >= read_index)
124 size_t write_index = write_index_.load(std::memory_order_acquire);
125 const size_t read_index = read_index_.load(std::memory_order_relaxed);
137 size_t write_index = write_index_.load(std::memory_order_relaxed);
138 const size_t read_index = read_index_.load(std::memory_order_acquire);
151 enqueue(T
const &t, T *buffer,
size_t const max_size) {
152 const size_t write_index = write_index_.load(
153 std::memory_order_relaxed);
154 const size_t next =
next_index(write_index, max_size);
156 if (next == read_index_.load(std::memory_order_acquire))
159 new (buffer + write_index) T(t);
161 write_index_.store(next, std::memory_order_release);
178 enqueue(
const T *input_buffer,
size_t input_count, T *internal_buffer,
179 size_t const max_size) {
180 const size_t write_index = write_index_.load(
181 std::memory_order_relaxed);
182 const size_t read_index = read_index_.load(std::memory_order_acquire);
183 const size_t avail =
write_available(write_index, read_index, max_size);
185 if constexpr (_All) {
186 if (avail < input_count)
191 input_count = (std::min) (input_count, avail);
194 size_t new_write_index = write_index + input_count;
196 if (write_index + input_count > max_size) {
198 const size_t count0 = max_size - write_index;
199 const size_t count1 = input_count - count0;
205 std::memcpy(internal_buffer + write_index, input_buffer, count0 *
sizeof(T));
206 std::memcpy(internal_buffer, input_buffer + count0, count1 *
sizeof(T));
207 new_write_index -= max_size;
211 std::memcpy(internal_buffer + write_index, input_buffer,
212 input_count *
sizeof(T));
214 if (new_write_index == max_size)
218 write_index_.store(new_write_index, std::memory_order_release);
232 dequeue(T *output_buffer,
size_t output_count, T *internal_buffer,
233 size_t const max_size) {
234 const size_t write_index = write_index_.load(std::memory_order_acquire);
235 const size_t read_index =
236 read_index_.load(std::memory_order_relaxed);
238 const size_t avail =
read_available(write_index, read_index, max_size);
243 output_count = (std::min) (output_count, avail);
245 size_t new_read_index = read_index + output_count;
247 if (read_index + output_count > max_size) {
249 const size_t count0 = max_size - read_index;
250 const size_t count1 = output_count - count0;
256 std::memcpy(output_buffer, internal_buffer + read_index, count0 *
sizeof(T));
257 std::memcpy(output_buffer + count0, internal_buffer, count1 *
sizeof(T));
259 new_read_index -= max_size;
263 std::memcpy(output_buffer, internal_buffer + read_index,
264 output_count *
sizeof(T));
266 if (new_read_index == max_size)
270 read_index_.store(new_read_index, std::memory_order_release);
283 template <
typename _Func>
285 consume_all(_Func
const &functor, T *internal_buffer,
size_t max_size) {
286 const size_t write_index = write_index_.load(std::memory_order_acquire);
287 const size_t read_index =
288 read_index_.load(std::memory_order_relaxed);
290 const size_t avail =
read_available(write_index, read_index, max_size);
295 const size_t output_count = avail;
297 size_t new_read_index = read_index + output_count;
299 if (read_index + output_count > max_size) {
301 const size_t count0 = max_size - read_index;
302 const size_t count1 = output_count - count0;
304 functor(internal_buffer + read_index, count0);
305 functor(internal_buffer, count1);
307 new_read_index -= max_size;
309 functor(internal_buffer + read_index, output_count);
311 if (new_read_index == max_size)
315 read_index_.store(new_read_index, std::memory_order_release);
326 front(
const T *internal_buffer)
const {
327 const size_t read_index =
328 read_index_.load(std::memory_order_relaxed);
329 return *(internal_buffer + read_index);
340 const size_t read_index =
341 read_index_.load(std::memory_order_relaxed);
342 return *(internal_buffer + read_index);
353 return empty(write_index_.load(std::memory_order_relaxed),
354 read_index_.load(std::memory_order_relaxed));
366 empty(
size_t write_index,
size_t read_index) {
367 return write_index == read_index;
382template <
typename T,
size_t _MaxSize>
384 typedef std::size_t size_t;
385 constexpr static size_t max_size = _MaxSize + 1;
386 std::array<T, max_size> array_;
419 template <
bool _All = true>
447 template <
typename Func>
449 dequeue(Func
const &func, T *ret,
size_t size)
noexcept {
450 const size_t nb_consume =
453 func(ret, nb_consume);
464 template <
typename Func>
481 typedef std::size_t size_t;
482 const size_t max_size_;
483 std::unique_ptr<T> array_;
492 : max_size_(max_size + 1)
493 , array_(new T[max_size + 1]) {}
525 template <
bool _All = true>
553 template <
typename Func>
555 dequeue(Func
const &func, T *ret,
size_t size)
noexcept {
556 const size_t nb_consume =
559 func(ret, nb_consume);
570 template <
typename Func>
Branch prediction hint utilities for performance optimization.
Base implementation of the Single-Producer Single-Consumer ringbuffer.
Definition spsc.h:51
bool enqueue(T const &t, T *buffer, size_t const max_size)
Enqueue a single element into the buffer.
Definition spsc.h:151
const T & front(const T *internal_buffer) const
Get a reference to the element at the read index (const version)
Definition spsc.h:326
static size_t read_available(size_t write_index, size_t read_index, size_t const max_size)
Calculate how many elements are available for reading.
Definition spsc.h:92
size_t consume_all(_Func const &functor, T *internal_buffer, size_t max_size)
Process all available elements in the buffer using a functor.
Definition spsc.h:285
static size_t write_available(size_t write_index, size_t read_index, size_t const max_size)
Calculate how many elements can be written.
Definition spsc.h:109
static size_t next_index(size_t arg, size_t const max_size)
Calculate the next index in the buffer with wrap-around handling.
Definition spsc.h:76
ringbuffer()
Default constructor initializing indices.
Definition spsc.h:64
size_t write_available(size_t const max_size) const
Get the number of slots available for writing.
Definition spsc.h:136
size_t read_available(size_t const max_size) const
Get the number of elements available for reading.
Definition spsc.h:123
bool empty()
Check if the buffer is empty.
Definition spsc.h:352
T & front(T *internal_buffer)
Get a reference to the element at the read index.
Definition spsc.h:339
size_t enqueue(const T *input_buffer, size_t input_count, T *internal_buffer, size_t const max_size)
Enqueue multiple elements into the buffer.
Definition spsc.h:178
size_t dequeue(T *output_buffer, size_t output_count, T *internal_buffer, size_t const max_size)
Dequeue multiple elements from the buffer.
Definition spsc.h:232
size_t consume_all(Func const &func) noexcept
Process all available elements in the buffer using a functor.
Definition spsc.h:572
bool enqueue(T const &t) noexcept
Enqueue a single element into the buffer.
Definition spsc.h:502
bool dequeue(T *ret) noexcept
Dequeue a single element from the buffer.
Definition spsc.h:513
size_t enqueue(T const *t, size_t size) noexcept
Enqueue multiple elements into the buffer.
Definition spsc.h:527
size_t dequeue(T *ret, size_t size) noexcept
Dequeue multiple elements from the buffer.
Definition spsc.h:540
ringbuffer(size_t const max_size)
Constructs a ringbuffer with the specified maximum size.
Definition spsc.h:491
size_t dequeue(Func const &func, T *ret, size_t size) noexcept
Dequeue multiple elements and process them with a functor.
Definition spsc.h:555
size_t consume_all(Func const &func) noexcept
Process all available elements in the buffer using a functor.
Definition spsc.h:466
size_t dequeue(Func const &func, T *ret, size_t size) noexcept
Dequeue multiple elements and process them with a functor.
Definition spsc.h:449
size_t dequeue(T *ret, size_t size) noexcept
Dequeue multiple elements from the buffer.
Definition spsc.h:434
bool enqueue(T const &t) noexcept
Enqueue a single element into the buffer.
Definition spsc.h:396
bool dequeue(T *ret) noexcept
Dequeue a single element from the buffer.
Definition spsc.h:407
size_t enqueue(T const *t, size_t size) noexcept
Enqueue multiple elements into the buffer.
Definition spsc.h:421
bool unlikely(bool expr)
Hint for branch prediction when a condition is expected to be false.
Definition branch_hints.h:76
Defines a base class to make derived classes non-copyable.
Platform-specific alignment macros, cache-line definitions, and related utilities.
nocopy()=default
Default constructor.