27#ifndef QB_LOCKFREE_MPSC_H
28#define QB_LOCKFREE_MPSC_H
34namespace qb::lockfree::mpsc {
39using Clock = std::chrono::high_resolution_clock;
57template <
typename T, std::
size_t max_size,
size_t nb_producer = 0>
59 typedef std::size_t size_t;
68 constexpr static const int padding_size =
69 QB_LOCKFREE_CACHELINE_BYTES -
sizeof(
SpinLock);
71 char padding1[padding_size]{};
76 std::array<Producer, nb_producer> _producers;
86 template <
size_t _Index>
89 return _producers[_Index]._ringbuffer.enqueue(t);
101 template <
size_t _Index,
bool _All = true>
104 return _producers[_Index]._ringbuffer.enqueue(t, size);
116 return _producers[index]._ringbuffer.enqueue(t);
128 template <
bool _All = true>
130 enqueue(
size_t const index, T
const *t,
size_t const size) {
131 return _producers[index]._ringbuffer.template
enqueue<_All>(t, size);
145 const size_t index = Clock::now().time_since_epoch().count() % nb_producer;
146 std::lock_guard<SpinLock> lock(_producers[index].lock);
147 return _producers[index]._ringbuffer.enqueue(t);
161 template <
bool _All = true>
164 const size_t index = Clock::now().time_since_epoch().count() % nb_producer;
165 std::lock_guard<SpinLock> lock(_producers[index].lock);
166 return _producers[index]._ringbuffer.template
enqueue<_All>(t, size);
181 const size_t save_size = size;
182 for (
auto &producer : _producers) {
183 size -= producer._ringbuffer.dequeue(ret, size);
187 return save_size - size;
199 template <
typename Func>
201 dequeue(Func
const &func, T *ret,
size_t const size) {
202 size_t nb_consume = 0;
203 for (
auto &producer : _producers) {
204 nb_consume += producer._ringbuffer.dequeue(func, ret, size);
216 template <
typename Func>
219 size_t nb_consume = 0;
220 for (
auto &producer : _producers) {
221 nb_consume += producer._ringbuffer.consume_all(func);
234 return _producers[index]._ringbuffer;
248template <
typename T, std::
size_t max_size>
250 typedef std::size_t size_t;
259 constexpr static const int padding_size =
260 QB_LOCKFREE_CACHELINE_BYTES -
sizeof(
SpinLock);
262 char padding1[padding_size]{};
267 std::vector<Producer> _producers;
268 const std::size_t _nb_producer;
282 : _producers(nb_producer)
283 , _nb_producer(nb_producer) {}
292 template <
size_t _Index>
295 return _producers[_Index]._ringbuffer.enqueue(t);
307 template <
size_t _Index,
bool _All = true>
310 return _producers[_Index]._ringbuffer.template
enqueue<_All>(t, size);
322 return _producers[index]._ringbuffer.enqueue(t);
334 template <
bool _All = true>
336 enqueue(
size_t const index, T
const *t,
size_t const size) {
337 return _producers[index]._ringbuffer.template
enqueue<_All>(t, size);
351 const size_t index = Clock::now().time_since_epoch().count() % _nb_producer;
352 std::lock_guard<SpinLock> lock(_producers[index].lock);
353 return _producers[index]._ringbuffer.enqueue(t);
367 template <
bool _All = true>
370 const size_t index = Clock::now().time_since_epoch().count() % _nb_producer;
371 std::lock_guard<SpinLock> lock(_producers[index].lock);
372 return _producers[index]._ringbuffer.template
enqueue<_All>(t, size);
387 const size_t save_size = size;
388 for (
size_t i = 0; i < _nb_producer; ++i) {
389 size -= _producers[i]._ringbuffer.dequeue(ret, size);
393 return save_size - size;
405 template <
typename Func>
407 dequeue(Func
const &func, T *ret,
size_t const size) {
408 size_t nb_consume = 0;
409 for (
size_t i = 0; i < _nb_producer; ++i) {
410 nb_consume += _producers[i]._ringbuffer.dequeue(func, ret, size);
422 template <
typename Func>
425 size_t nb_consume = 0;
426 for (
size_t i = 0; i < _nb_producer; ++i) {
427 nb_consume += _producers[i]._ringbuffer.consume_all(func);
440 return _producers[index]._ringbuffer;
A spinlock implementation for lightweight thread synchronization.
Definition spinlock.h:41
bool enqueue(size_t const index, T const &t)
Enqueue an item using a runtime producer index.
Definition mpsc.h:321
size_t enqueue(size_t const index, T const *t, size_t const size)
Enqueue multiple items using a runtime producer index.
Definition mpsc.h:336
ringbuffer()=delete
Default constructor is deleted - must specify number of producers.
size_t dequeue(Func const &func, T *ret, size_t const size)
Dequeue multiple items with a function to process each item.
Definition mpsc.h:407
ringbuffer(std::size_t const nb_producer)
Constructor with runtime number of producers.
Definition mpsc.h:281
auto & ringOf(size_t const index)
Get direct access to a specific producer's ring buffer.
Definition mpsc.h:439
size_t enqueue(T const &t)
Enqueue an item using a random producer index.
Definition mpsc.h:350
size_t consume_all(Func const &func)
Process all available items from all producers.
Definition mpsc.h:424
size_t dequeue(T *ret, size_t size)
Dequeue multiple items from all producers.
Definition mpsc.h:386
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a random producer index.
Definition mpsc.h:369
bool enqueue(T const &t)
Enqueue an item using a compile-time producer index.
Definition mpsc.h:294
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a compile-time producer index.
Definition mpsc.h:309
Multi-Producer Single-Consumer ring buffer with fixed number of producers.
Definition mpsc.h:58
size_t consume_all(Func const &func)
Process all available items from all producers.
Definition mpsc.h:218
size_t enqueue(T const &t)
Enqueue an item using a random producer index.
Definition mpsc.h:144
size_t dequeue(Func const &func, T *ret, size_t const size)
Dequeue multiple items with a function to process each item.
Definition mpsc.h:201
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a compile-time producer index.
Definition mpsc.h:103
size_t dequeue(T *ret, size_t size)
Dequeue multiple items from all producers.
Definition mpsc.h:180
bool enqueue(size_t const index, T const &t)
Enqueue an item using a runtime producer index.
Definition mpsc.h:115
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a random producer index.
Definition mpsc.h:163
bool enqueue(T const &t)
Enqueue an item using a compile-time producer index.
Definition mpsc.h:88
size_t enqueue(size_t const index, T const *t, size_t const size)
Enqueue multiple items using a runtime producer index.
Definition mpsc.h:130
auto & ringOf(size_t const index)
Get direct access to a specific producer's ring buffer.
Definition mpsc.h:233
Fixed-size implementation of the SPSC ringbuffer.
Definition spsc.h:383
std::chrono::high_resolution_clock Clock
High-resolution clock type used for distribution of producer indexes.
Definition mpsc.h:39
std::chrono::nanoseconds Nanoseconds
Nanosecond duration type used for time measurements.
Definition mpsc.h:44
Spinlock synchronization primitives.
Single-Producer Single-Consumer lockfree data structures.
nocopy()=default
Default constructor.