qb  2.0.0.0
C++17 Actor Framework
qb Issue Watch Star Fork Follow @isndev
Loading...
Searching...
No Matches
mpsc.h
Go to the documentation of this file.
1
26
27#ifndef QB_LOCKFREE_MPSC_H
28#define QB_LOCKFREE_MPSC_H
29#include <chrono>
30#include <mutex>
31#include "spinlock.h"
32#include "spsc.h"
33
34namespace qb::lockfree::mpsc {
35
39using Clock = std::chrono::high_resolution_clock;
40
44using Nanoseconds = std::chrono::nanoseconds;
45
57template <typename T, std::size_t max_size, size_t nb_producer = 0>
58class ringbuffer : public nocopy {
59 typedef std::size_t size_t;
60
67 struct Producer {
68 constexpr static const int padding_size =
69 QB_LOCKFREE_CACHELINE_BYTES - sizeof(SpinLock);
70 SpinLock lock;
71 char padding1[padding_size]{};
73 _ringbuffer;
74 };
75
76 std::array<Producer, nb_producer> _producers;
77
78public:
86 template <size_t _Index>
87 bool
88 enqueue(T const &t) {
89 return _producers[_Index]._ringbuffer.enqueue(t);
90 }
91
101 template <size_t _Index, bool _All = true>
102 size_t
103 enqueue(T const *t, size_t const size) {
104 return _producers[_Index]._ringbuffer.enqueue(t, size);
105 }
106
114 bool
115 enqueue(size_t const index, T const &t) {
116 return _producers[index]._ringbuffer.enqueue(t);
117 }
118
128 template <bool _All = true>
129 size_t
130 enqueue(size_t const index, T const *t, size_t const size) {
131 return _producers[index]._ringbuffer.template enqueue<_All>(t, size);
132 }
133
143 size_t
144 enqueue(T const &t) {
145 const size_t index = Clock::now().time_since_epoch().count() % nb_producer;
146 std::lock_guard<SpinLock> lock(_producers[index].lock);
147 return _producers[index]._ringbuffer.enqueue(t);
148 }
149
161 template <bool _All = true>
162 size_t
163 enqueue(T const *t, size_t const size) {
164 const size_t index = Clock::now().time_since_epoch().count() % nb_producer;
165 std::lock_guard<SpinLock> lock(_producers[index].lock);
166 return _producers[index]._ringbuffer.template enqueue<_All>(t, size);
167 }
168
179 size_t
180 dequeue(T *ret, size_t size) {
181 const size_t save_size = size;
182 for (auto &producer : _producers) {
183 size -= producer._ringbuffer.dequeue(ret, size);
184 if (!size)
185 break;
186 }
187 return save_size - size;
188 }
189
199 template <typename Func>
200 size_t
201 dequeue(Func const &func, T *ret, size_t const size) {
202 size_t nb_consume = 0;
203 for (auto &producer : _producers) {
204 nb_consume += producer._ringbuffer.dequeue(func, ret, size);
205 }
206 return nb_consume;
207 }
208
216 template <typename Func>
217 size_t
218 consume_all(Func const &func) {
219 size_t nb_consume = 0;
220 for (auto &producer : _producers) {
221 nb_consume += producer._ringbuffer.consume_all(func);
222 }
223 return nb_consume;
224 }
225
232 auto &
233 ringOf(size_t const index) {
234 return _producers[index]._ringbuffer;
235 }
236};
237
248template <typename T, std::size_t max_size>
249class ringbuffer<T, max_size, 0> : public nocopy {
250 typedef std::size_t size_t;
251
258 struct Producer {
259 constexpr static const int padding_size =
260 QB_LOCKFREE_CACHELINE_BYTES - sizeof(SpinLock);
261 SpinLock lock;
262 char padding1[padding_size]{};
264 _ringbuffer;
265 };
266
267 std::vector<Producer> _producers;
268 const std::size_t _nb_producer;
269
270public:
274 ringbuffer() = delete;
275
281 explicit ringbuffer(std::size_t const nb_producer)
282 : _producers(nb_producer)
283 , _nb_producer(nb_producer) {}
284
292 template <size_t _Index>
293 bool
294 enqueue(T const &t) {
295 return _producers[_Index]._ringbuffer.enqueue(t);
296 }
297
307 template <size_t _Index, bool _All = true>
308 size_t
309 enqueue(T const *t, size_t const size) {
310 return _producers[_Index]._ringbuffer.template enqueue<_All>(t, size);
311 }
312
320 bool
321 enqueue(size_t const index, T const &t) {
322 return _producers[index]._ringbuffer.enqueue(t);
323 }
324
334 template <bool _All = true>
335 size_t
336 enqueue(size_t const index, T const *t, size_t const size) {
337 return _producers[index]._ringbuffer.template enqueue<_All>(t, size);
338 }
339
349 size_t
350 enqueue(T const &t) {
351 const size_t index = Clock::now().time_since_epoch().count() % _nb_producer;
352 std::lock_guard<SpinLock> lock(_producers[index].lock);
353 return _producers[index]._ringbuffer.enqueue(t);
354 }
355
367 template <bool _All = true>
368 size_t
369 enqueue(T const *t, size_t const size) {
370 const size_t index = Clock::now().time_since_epoch().count() % _nb_producer;
371 std::lock_guard<SpinLock> lock(_producers[index].lock);
372 return _producers[index]._ringbuffer.template enqueue<_All>(t, size);
373 }
374
385 size_t
386 dequeue(T *ret, size_t size) {
387 const size_t save_size = size;
388 for (size_t i = 0; i < _nb_producer; ++i) {
389 size -= _producers[i]._ringbuffer.dequeue(ret, size);
390 if (!size)
391 break;
392 }
393 return save_size - size;
394 }
395
405 template <typename Func>
406 size_t
407 dequeue(Func const &func, T *ret, size_t const size) {
408 size_t nb_consume = 0;
409 for (size_t i = 0; i < _nb_producer; ++i) {
410 nb_consume += _producers[i]._ringbuffer.dequeue(func, ret, size);
411 }
412 return nb_consume;
413 }
414
422 template <typename Func>
423 size_t
424 consume_all(Func const &func) {
425 size_t nb_consume = 0;
426 for (size_t i = 0; i < _nb_producer; ++i) {
427 nb_consume += _producers[i]._ringbuffer.consume_all(func);
428 }
429 return nb_consume;
430 }
431
438 auto &
439 ringOf(size_t const index) {
440 return _producers[index]._ringbuffer;
441 }
442};
443
444} // namespace qb::lockfree::mpsc
445
446#endif // QB_LOCKFREE_MPSC_H
A spinlock implementation for lightweight thread synchronization.
Definition spinlock.h:41
bool enqueue(size_t const index, T const &t)
Enqueue an item using a runtime producer index.
Definition mpsc.h:321
size_t enqueue(size_t const index, T const *t, size_t const size)
Enqueue multiple items using a runtime producer index.
Definition mpsc.h:336
ringbuffer()=delete
Default constructor is deleted - must specify number of producers.
size_t dequeue(Func const &func, T *ret, size_t const size)
Dequeue multiple items with a function to process each item.
Definition mpsc.h:407
ringbuffer(std::size_t const nb_producer)
Constructor with runtime number of producers.
Definition mpsc.h:281
auto & ringOf(size_t const index)
Get direct access to a specific producer's ring buffer.
Definition mpsc.h:439
size_t enqueue(T const &t)
Enqueue an item using a random producer index.
Definition mpsc.h:350
size_t consume_all(Func const &func)
Process all available items from all producers.
Definition mpsc.h:424
size_t dequeue(T *ret, size_t size)
Dequeue multiple items from all producers.
Definition mpsc.h:386
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a random producer index.
Definition mpsc.h:369
bool enqueue(T const &t)
Enqueue an item using a compile-time producer index.
Definition mpsc.h:294
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a compile-time producer index.
Definition mpsc.h:309
Multi-Producer Single-Consumer ring buffer with fixed number of producers.
Definition mpsc.h:58
size_t consume_all(Func const &func)
Process all available items from all producers.
Definition mpsc.h:218
size_t enqueue(T const &t)
Enqueue an item using a random producer index.
Definition mpsc.h:144
size_t dequeue(Func const &func, T *ret, size_t const size)
Dequeue multiple items with a function to process each item.
Definition mpsc.h:201
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a compile-time producer index.
Definition mpsc.h:103
size_t dequeue(T *ret, size_t size)
Dequeue multiple items from all producers.
Definition mpsc.h:180
bool enqueue(size_t const index, T const &t)
Enqueue an item using a runtime producer index.
Definition mpsc.h:115
size_t enqueue(T const *t, size_t const size)
Enqueue multiple items using a random producer index.
Definition mpsc.h:163
bool enqueue(T const &t)
Enqueue an item using a compile-time producer index.
Definition mpsc.h:88
size_t enqueue(size_t const index, T const *t, size_t const size)
Enqueue multiple items using a runtime producer index.
Definition mpsc.h:130
auto & ringOf(size_t const index)
Get direct access to a specific producer's ring buffer.
Definition mpsc.h:233
Fixed-size implementation of the SPSC ringbuffer.
Definition spsc.h:383
std::chrono::high_resolution_clock Clock
High-resolution clock type used for distribution of producer indexes.
Definition mpsc.h:39
std::chrono::nanoseconds Nanoseconds
Nanosecond duration type used for time measurements.
Definition mpsc.h:44
Spinlock synchronization primitives.
Single-Producer Single-Consumer lockfree data structures.
nocopy()=default
Default constructor.