qb  2.0.0.0
C++17 Actor Framework
qb Issue Watch Star Fork Follow @isndev
Loading...
Searching...
No Matches
io.h
Go to the documentation of this file.
1
28
29#ifndef QB_IO_ASYNC_IO_H
30#define QB_IO_ASYNC_IO_H
31
33#include "event/all.h"
34#include "listener.h"
35#include "protocol.h"
36
37CREATE_MEMBER_CHECK(Protocol);
39
40#define Derived static_cast<_Derived &>(*this)
41
42namespace qb::io::async {
43
57template <typename _Derived, typename _EV_EVENT>
58class base {
59protected:
60 _EV_EVENT &_async_event;
61
68 : _async_event(listener::current.registerEvent<_EV_EVENT>(Derived)) {}
69
77 //std::cout << "handle=" << _async_event.fd
78 // << " disposed async.stop()" << std::endl;
79 _async_event.stop();
80 listener::current.unregisterEvent(_async_event._interface);
81 }
82};
83
95template <typename _Derived>
96class with_timeout : public base<with_timeout<_Derived>, event::timer> {
97 ev_tstamp _timeout;
98 ev_tstamp _last_activity;
99
100public:
106 explicit with_timeout(ev_tstamp timeout = 3)
107 : _timeout(timeout)
108 , _last_activity(0) {
109 if (timeout > 0.)
110 this->_async_event.start(_timeout);
111 }
112
119 void
120 updateTimeout() noexcept {
121 _last_activity = this->_async_event.loop.now();
122 }
123
130 void
131 setTimeout(ev_tstamp timeout) noexcept {
132 _timeout = timeout;
133 if (_timeout > 0.) { // Check against 0, not just if(_timeout)
134 _last_activity = ev_time(); // Consider using this->_async_event.loop.now() for consistency
135 this->_async_event.set(_timeout);
136 this->_async_event.start();
137 } else
138 this->_async_event.stop();
139 }
140
145 auto
146 getTimeout() const noexcept {
147 return _timeout;
148 }
149
150private:
151 friend class listener::RegisteredKernelEvent<event::timer, with_timeout>;
152
161 void
162 on(event::timer &event) noexcept {
163 const ev_tstamp after = _last_activity - event.loop.now() + _timeout;
164
165 if (after < 0.)
166 Derived.on(event);
167 else {
168 this->_async_event.set(after);
169 this->_async_event.start();
170 }
171 }
172};
173
185template <typename _Func>
186class Timeout : public with_timeout<Timeout<_Func>> {
187 _Func _func;
188
189public:
197 Timeout(_Func &&func, double timeout = 0.)
198 : with_timeout<Timeout<_Func>>(timeout)
199 , _func(std::forward<_Func>(func)) {
200 if (!timeout) { // More direct check for zero or less
201 _func();
202 delete this;
203 }
204 }
205
212 void
213 on(event::timer const & /*event*/) const { // Marked event as unused
214 _func();
215 delete this;
216 }
217};
218
231template <typename _Func>
232void
233callback(_Func &&func, double timeout = 0.) {
234 new Timeout<_Func>(std::forward<_Func>(func), timeout);
235}
236
237template <typename _Func, typename Rep, typename Period>
238void callback(_Func&& func, std::chrono::duration<Rep, Period> timeout_duration) {
239 double seconds = std::chrono::duration_cast<std::chrono::duration<double>>(timeout_duration).count();
240 callback(std::forward<_Func>(func), seconds);
241}
242
255template <typename _Derived>
256class file_watcher : public base<file_watcher<_Derived>, event::file> {
257 using base_t = base<file_watcher<_Derived>, event::file>;
258 IProtocol *_protocol = nullptr;
259 std::vector<IProtocol *> _protocol_list;
260
261public:
263 constexpr static const bool do_read = true;
264
270 file_watcher() = default;
271
279 file_watcher(IProtocol *protocol) noexcept
280 : _protocol(protocol) {}
281
285 file_watcher(file_watcher const &) = delete;
286
292 ~file_watcher() noexcept {
293 for (auto protocol_ptr : _protocol_list) // Renamed to avoid conflict
294 delete protocol_ptr;
295 }
296
308 template <typename _Protocol, typename... _Args>
309 _Protocol *
310 switch_protocol(_Args &&...args) {
311 auto new_protocol = new _Protocol(std::forward<_Args>(args)...);
312 if (new_protocol->ok()) {
313 _protocol = new_protocol;
314 _protocol_list.push_back(new_protocol);
315 return new_protocol;
316 }
317 return nullptr;
318 }
319
328 void
329 start(std::string const &fpath, ev_tstamp ts = 0.1) noexcept {
330 this->_async_event.start(fpath.c_str(), ts);
331 }
332
338 void
339 disconnect() noexcept {
340 this->_async_event.stop();
341 }
342
355 int
357 constexpr const auto invalid_ret = static_cast<std::size_t>(-1);
358 std::size_t ret = 0u;
359 do {
360 ret = static_cast<std::size_t>(Derived.read());
361 if (unlikely(ret == invalid_ret))
362 return -1;
363 while ((ret = this->_protocol->getMessageSize()) > 0) {
364 // has a new message to read
365 this->_protocol->onMessage(ret);
366 Derived.flush(ret);
367 }
368 Derived.eof();
369 if constexpr (has_method_on<_Derived, void, event::pending_read>::value ||
370 has_method_on<_Derived, void, event::eof>::value) {
371 const auto pendingRead = Derived.pendingRead();
372 if (pendingRead) {
373 if constexpr (has_method_on<_Derived, void,
374 event::pending_read>::value) {
375 Derived.on(event::pending_read{pendingRead});
376 }
377 } else {
378 if constexpr (has_method_on<_Derived, void, event::eof>::value) {
379 Derived.on(event::eof{});
380 }
381 }
382 }
383 } while (ret);
384 return 0;
385 }
386
387private:
389
402 void
403 on(event::file const &event) {
404 int ret = 0u;
405
406 // forward event to Derived if desired
407 if constexpr (has_method_on<_Derived, void, event::file>::value) {
408 Derived.on(event);
409 }
410
411 auto diff_read = event.attr.st_size - event.prev.st_size;
412 if (!_protocol->ok() || !event.attr.st_nlink ||
413 (diff_read < 0 && lseek(Derived.transport().native_handle(), 0, SEEK_SET)))
414 ret = -1;
415 else if (diff_read) {
416 if constexpr (_Derived::do_read) {
417 ret = read_all();
418 }
419 }
420
421 if (ret < 0) {
422 this->_async_event.stop();
423 Derived.close();
424 }
425 }
426};
427
439template <typename _Derived>
440class directory_watcher : public base<directory_watcher<_Derived>, event::file> {
441 using base_t = base<directory_watcher<_Derived>, event::file>;
442
443public:
445 constexpr static const bool do_read = false;
446
450 directory_watcher() = default;
451
456
464 void
465 start(std::string const &fpath, ev_tstamp ts = 0.1) noexcept {
466 this->_async_event.start(fpath.c_str(), ts);
467 }
468
473 void
474 disconnect() noexcept {
475 this->_async_event.stop();
476 }
477
478private:
480
489 void
490 on(event::file const &event) {
491 // int ret = 0u;
492
493 // forward event to Derived if desired
494 if constexpr (has_method_on<_Derived, void, event::file>::value) {
495 Derived.on(event);
496 }
497
498 // if (ret < 0) {
499 // this->_async_event.stop();
500 // }
501 }
502};
503
516template <typename _Derived>
517class input : public base<input<_Derived>, event::io> {
518 using base_t = base<input<_Derived>, event::io>;
519 IProtocol *_protocol = nullptr;
520 std::vector<IProtocol *> _protocol_list;
521 bool _on_message = false;
522 bool _is_disposed = false;
523 int _reason = 0;
524
525public:
527 constexpr static const bool has_server = false;
528
534 input() = default;
535
545 : _protocol(protocol) {
546 _protocol_list.push_back(protocol); // Assumes ownership
547 }
548
552 input(input const &) = delete;
553
559 ~input() noexcept {
561 }
562
573 template <typename _Protocol, typename... _Args>
574 _Protocol *
575 switch_protocol(_Args &&...args) {
576 auto new_protocol = new _Protocol(std::forward<_Args>(args)...);
577 if (new_protocol->ok()) {
578 _protocol = new_protocol;
579 _protocol_list.push_back(new_protocol);
580 return new_protocol;
581 } else
582 delete new_protocol;
583 return nullptr;
584 }
585
591 void
593 for (auto protocol : _protocol_list)
594 delete protocol;
595 _protocol_list.clear();
596 _protocol = nullptr;
597 };
598
603 IProtocol *
605 return _protocol;
606 }
607
614 void
615 start() noexcept {
616 _reason = 0;
617 Derived.transport().set_nonblocking(true);
618 this->_async_event.start(Derived.transport().native_handle(), EV_READ);
619 }
620
627 void
628 ready_to_read() noexcept {
629 if (!(this->_async_event.events & EV_READ))
630 this->_async_event.start(Derived.transport().native_handle(), EV_READ);
631 }
632
641 void
642 disconnect(int reason = 1) {
643 _reason = reason;
644 this->_async_event.feed_event(EV_UNDEF);
645 }
646
647private:
648 friend class listener::RegisteredKernelEvent<event::io, input>;
649
667 void
668 on(event::io const &event) {
669 constexpr const auto invalid_ret = static_cast<std::size_t>(-1);
670 std::size_t ret = 0u;
671
672 if (_on_message)
673 return;
674 if (_reason || !_protocol->ok())
675 goto error;
676
677 if (likely(event._revents & EV_READ)) {
678 ret = static_cast<std::size_t>(Derived.read());
679 if (unlikely(ret == invalid_ret))
680 goto error;
681 _on_message = true;
682 while ((ret = this->_protocol->getMessageSize()) > 0) {
683 auto protocol = this->_protocol;
684 protocol->onMessage(ret);
685 if (protocol->should_flush())
686 Derived.flush(ret);
687 }
688 _on_message = false;
689 Derived.eof();
690 if constexpr (has_method_on<_Derived, void, event::pending_read>::value ||
691 has_method_on<_Derived, void, event::eof>::value) {
692 const auto pendingRead = Derived.pendingRead();
693 if (pendingRead) {
694 if constexpr (has_method_on<_Derived, void,
695 event::pending_read>::value) {
696 Derived.on(event::pending_read{pendingRead});
697 }
698 } else {
699 if constexpr (has_method_on<_Derived, void, event::eof>::value) {
700 Derived.on(event::eof{});
701 }
702 }
703 }
704 return;
705 }
706 error:
707#ifdef _WIN32
708 if (socket::get_last_errno() == 10035)
709 return;
710#endif
711 dispose();
712 }
713
714protected:
724 void
726 if (_is_disposed)
727 return;
728 _is_disposed = true;
729
730 if constexpr (has_method_on<_Derived, void, event::disconnected>::value) {
731 Derived.on(event::disconnected{_reason});
732 }
733
734 if constexpr (_Derived::has_server) {
735 Derived.server().disconnected(Derived.id());
736 } else if constexpr (has_method_on<_Derived, void, event::dispose>::value) {
737 Derived.on(event::dispose{});
738 }
739 }
740};
741
754template <typename _Derived>
755class output : public base<output<_Derived>, event::io> {
756 using base_t = base<output<_Derived>, event::io>;
757 bool _is_disposed = false;
758 int _reason = 0;
759
760public:
762 constexpr static const bool has_server = false;
763
767 output() = default;
768
772 output(output const &) = delete;
773
778 ~output() = default;
779
786 void
787 start() noexcept {
788 _reason = 0;
789 Derived.transport().set_nonblocking(true);
790 this->_async_event.start(Derived.transport().native_handle(), EV_WRITE);
791 }
792
799 void
800 ready_to_write() noexcept {
801 if (!(this->_async_event.events & EV_WRITE))
802 this->_async_event.set(EV_WRITE);
803 }
804
813 template <typename... _Args>
814 inline auto &
815 publish(_Args &&...args) noexcept {
817 if constexpr (sizeof...(_Args))
818 (Derived.out() << ... << std::forward<_Args>(args));
819 return Derived.out();
820 }
821
829 template <typename T>
830 auto &
831 operator<<(T &&data) {
832 return publish(std::forward<T>(data));
833 }
834
842 void
843 disconnect(int reason = 1) {
844 _reason = reason;
845 this->_async_event.feed_event(EV_UNDEF);
846 }
847
848private:
849 friend class listener::RegisteredKernelEvent<event::io, output>;
850
866 void
867 on(event::io const &event) {
868 auto ret = 0;
869
870 if (_reason)
871 goto error;
872
873 if (likely(event._revents & EV_WRITE)) {
874 ret = Derived.write();
875 if (unlikely(ret < 0))
876 goto error;
877 if (!Derived.pendingWrite()) {
878 this->_async_event.set(EV_NONE);
879 if constexpr (has_method_on<_Derived, void, event::eos>::value) {
880 Derived.on(event::eos{});
881 }
882 } else if constexpr (has_method_on<_Derived, void,
883 event::pending_write>::value) {
884 Derived.on(event::pending_write{Derived.pendingWrite()});
885 }
886 return;
887 }
888 error:
889#ifdef _WIN32
890 if (socket::get_last_errno() == 10035)
891 return;
892#endif
893 dispose();
894 }
895
896protected:
906 void
908 if (_is_disposed)
909 return;
910 _is_disposed = true;
911
912 if constexpr (has_method_on<_Derived, void, event::disconnected>::value) {
913 Derived.on(event::disconnected{_reason});
914 }
915
916 if constexpr (_Derived::has_server) {
917 Derived.server().disconnected(Derived.id());
918 } else if constexpr (has_method_on<_Derived, void, event::dispose>::value) {
919 Derived.on(event::dispose{});
920 }
921 }
922};
923
937template <typename _Derived>
938class io : public base<io<_Derived>, event::io> {
939 using base_t = base<io<_Derived>, event::io>;
940 IProtocol *_protocol = nullptr;
941 std::vector<IProtocol *> _protocol_list;
942 bool _on_message = false;
943 bool _is_disposed = false;
944 int _reason = 0;
945
946public:
948 constexpr static const bool has_server = false;
949
954 io() = default;
955
962 : _protocol(protocol) {
963 _protocol_list.push_back(protocol);
964 }
965
969 io(io const &) = delete;
970
975 ~io() noexcept {
977 }
978
986 template <typename _Protocol, typename... _Args>
987 _Protocol *
988 switch_protocol(_Args &&...args) {
989 auto new_protocol = new _Protocol(std::forward<_Args>(args)...);
990 if (new_protocol->ok()) {
991 _protocol = new_protocol;
992 _protocol_list.push_back(new_protocol);
993 return new_protocol;
994 } else
995 delete new_protocol;
996 return nullptr;
997 }
998
1003 void
1005 for (auto protocol : _protocol_list)
1006 delete protocol;
1007 _protocol_list.clear();
1008 _protocol = nullptr;
1009 };
1010
1015 IProtocol *
1017 return _protocol;
1018 }
1019
1025 void
1026 start() noexcept {
1027 _reason = 0;
1028 Derived.transport().set_nonblocking(true);
1029 this->_async_event.start(Derived.transport().native_handle(), EV_READ);
1030 }
1031
1036 void
1037 ready_to_read() noexcept {
1038 if (!(this->_async_event.events & EV_READ)) {
1039 this->_async_event.set(this->_async_event.events | EV_READ);
1040 }
1041 }
1042
1047 void
1048 ready_to_write() noexcept {
1049 if (!(this->_async_event.events & EV_WRITE)) {
1050 this->_async_event.set(this->_async_event.events | EV_WRITE);
1051 }
1052 }
1053
1060 void
1061 close_after_deliver() const noexcept {
1062 _protocol->not_ok();
1063 }
1064
1071 template <typename... _Args>
1072 inline auto &
1073 publish(_Args &&...args) noexcept {
1075 if constexpr (sizeof...(_Args))
1076 (Derived.out() << ... << std::forward<_Args>(args));
1077 return Derived.out();
1078 }
1079
1086 template <typename T>
1087 auto &
1088 operator<<(T &&data) {
1089 return publish(std::forward<T>(data));
1090 }
1091
1097 void
1098 disconnect(int reason = 1) {
1099 _reason = reason;
1100 this->_async_event.feed_event(EV_UNDEF);
1101 }
1102
1103private:
1104 friend class listener::RegisteredKernelEvent<event::io, io>;
1105
1129 void
1130 on(event::io const &event) {
1131 constexpr const std::size_t invalid_ret = static_cast<std::size_t>(-1);
1132 std::size_t ret = 0u;
1133 bool ok = false;
1134
1135 if (_on_message)
1136 return;
1137 if (_reason)
1138 goto error;
1139 if (event._revents & EV_READ && _protocol->ok()) {
1140 ret = static_cast<std::size_t>(Derived.read());
1141 if (unlikely(ret == invalid_ret))
1142 goto error;
1143
1144 _on_message = true;
1145 while ((ret = this->_protocol->getMessageSize()) > 0) {
1146 auto protocol = this->_protocol;
1147 protocol->onMessage(ret);
1148 if (protocol->should_flush())
1149 Derived.flush(ret);
1150 }
1151 _on_message = false;
1152 Derived.eof();
1153 if constexpr (has_method_on<_Derived, void, event::pending_read>::value ||
1154 has_method_on<_Derived, void, event::eof>::value) {
1155 const auto pendingRead = Derived.pendingRead();
1156 if (pendingRead) {
1157 if constexpr (has_method_on<_Derived, void,
1158 event::pending_read>::value) {
1159 Derived.on(event::pending_read{pendingRead});
1160 }
1161 } else {
1162 if constexpr (has_method_on<_Derived, void, event::eof>::value) {
1163 Derived.on(event::eof{});
1164 }
1165 }
1166 }
1167 ok = true;
1168 }
1169 if (event._revents & EV_WRITE) {
1170 ret = static_cast<std::size_t>(Derived.write());
1171 if (unlikely(ret == invalid_ret))
1172 goto error;
1173 if (!Derived.pendingWrite()) {
1174 if (!_protocol->ok())
1175 goto error;
1176 this->_async_event.set(EV_READ);
1177 if constexpr (has_method_on<_Derived, void, event::eos>::value) {
1178 Derived.on(event::eos{});
1179 }
1180 } else if constexpr (has_method_on<_Derived, void,
1181 event::pending_write>::value) {
1182 Derived.on(event::pending_write{Derived.pendingWrite()});
1183 }
1184 ok = true;
1185 }
1186 if (ok)
1187 return;
1188 error:
1189#ifdef _WIN32
1190 if (socket::get_last_errno() == 10035)
1191 return;
1192#endif
1193 dispose();
1194 }
1195
1196protected:
1204 void
1206 if (_is_disposed)
1207 return;
1208 _is_disposed = true;
1209
1210 if constexpr (has_method_on<_Derived, void, event::disconnected>::value) {
1211 Derived.on(event::disconnected{_reason});
1212 }
1213
1214 if constexpr (_Derived::has_server) {
1215 Derived.server().disconnected(Derived.id());
1216 } else {
1217 this->_async_event.stop(); // Stop the watcher to prevent further events
1218 if constexpr (has_method_on<_Derived, void, event::dispose>::value)
1219 Derived.on(event::dispose{});
1220 }
1221 }
1222};
1223
1224} // namespace qb::io::async
1225
1226#undef Derived
1227#endif // QB_IO_ASYNC_IO_H
Aggregation of all event types for the asynchronous I/O system.
Core event loop manager for the asynchronous IO framework.
Base interface for all message processing protocols.
Definition protocol.h:43
virtual void onMessage(std::size_t size) noexcept=0
Processes a complete message that has been identified in the input buffer.
virtual std::size_t getMessageSize() noexcept=0
Determines the size of the next complete message in the input buffer.
bool ok() const noexcept
Checks if the protocol is in a valid operational state.
Definition protocol.h:97
Utility class to execute a function after a specified timeout using the event loop.
Definition io.h:186
void on(event::timer const &) const
Timer event handler called when the timeout expires.
Definition io.h:213
Timeout(_Func &&func, double timeout=0.)
Constructor that schedules a function to be called after a timeout.
Definition io.h:197
~base()
Destructor that unregisters the event watcher.
Definition io.h:76
base()
Constructor that registers the event watcher with the current listener.
Definition io.h:67
CRTP base class for watching a directory for attribute changes.
Definition io.h:440
~directory_watcher()=default
Destructor.
directory_watcher< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:444
void disconnect() noexcept
Stops watching the directory.
Definition io.h:474
directory_watcher()=default
Default constructor.
void start(std::string const &fpath, ev_tstamp ts=0.1) noexcept
Starts watching a directory for attribute changes.
Definition io.h:465
static constexpr const bool do_read
Flag indicating this watcher type does not read directory content directly.
Definition io.h:445
CRTP base class for watching a single file for attribute changes and processing its contents.
Definition io.h:256
int read_all()
Reads all available data from the file and processes it using the current protocol.
Definition io.h:356
void start(std::string const &fpath, ev_tstamp ts=0.1) noexcept
Starts watching a file for attribute changes.
Definition io.h:329
file_watcher()=default
Default constructor.
file_watcher(IProtocol *protocol) noexcept
Constructor with an externally managed protocol.
Definition io.h:279
file_watcher< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:262
void disconnect() noexcept
Stops watching the file.
Definition io.h:339
file_watcher(file_watcher const &)=delete
Deleted copy constructor to prevent unintended copying of watcher state and resources.
_Protocol * switch_protocol(_Args &&...args)
Switches to a new protocol for processing file contents, taking ownership.
Definition io.h:310
static constexpr const bool do_read
Flag indicating this watcher type reads file content.
Definition io.h:263
~file_watcher() noexcept
Destructor.
Definition io.h:292
Asynchronous file operations handler.
Definition file.h:46
CRTP base class for managing asynchronous input operations with protocol processing.
Definition io.h:517
void ready_to_read() noexcept
Ensures the I/O watcher is listening for read events (EV_READ).
Definition io.h:628
IProtocol * protocol()
Definition io.h:604
input()=default
Default constructor.
input< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:526
void start() noexcept
Starts asynchronous input operations.
Definition io.h:615
input(input const &)=delete
Deleted copy constructor to prevent unintended copying of I/O state and resources.
_Protocol * switch_protocol(_Args &&...args)
Switches to a new protocol for processing input, taking ownership of the new protocol.
Definition io.h:575
void dispose()
Disposes of resources and finalizes disconnection for the input component.
Definition io.h:725
static constexpr const bool has_server
Definition io.h:527
~input() noexcept
Destructor.
Definition io.h:559
void clear_protocols()
Clears all owned protocol instances.
Definition io.h:592
void disconnect(int reason=1)
Initiates a graceful disconnection of the input component.
Definition io.h:642
input(IProtocol *protocol) noexcept
Constructor with an initial protocol instance.
Definition io.h:544
CRTP base class for managing bidirectional asynchronous I/O operations with protocol processing.
Definition io.h:938
_Protocol * switch_protocol(_Args &&...args)
Switches to a new protocol for I/O processing, taking ownership.
Definition io.h:988
io()=default
Default constructor.
auto & operator<<(T &&data)
Stream operator for publishing data.
Definition io.h:1088
void start() noexcept
Starts bidirectional asynchronous I/O operations.
Definition io.h:1026
io(IProtocol *protocol) noexcept
Constructor with an initial protocol instance.
Definition io.h:961
io(io const &)=delete
Deleted copy constructor.
void ready_to_write() noexcept
Ensures the I/O watcher is listening for write events (EV_WRITE).
Definition io.h:1048
void clear_protocols()
Clears all owned protocol instances.
Definition io.h:1004
static constexpr const bool has_server
Indicates this component is not inherently a server.
Definition io.h:948
auto & publish(_Args &&...args) noexcept
Publishes data to the output buffer and ensures write readiness.
Definition io.h:1073
~io() noexcept
Destructor.
Definition io.h:975
void dispose()
Disposes of resources and finalizes disconnection for the I/O component.
Definition io.h:1205
void disconnect(int reason=1)
Initiates a graceful disconnection.
Definition io.h:1098
IProtocol * protocol()
Gets a pointer to the current active protocol instance.
Definition io.h:1016
io< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:947
void close_after_deliver() const noexcept
Requests connection closure after all pending output data is delivered.
Definition io.h:1061
void ready_to_read() noexcept
Ensures the I/O watcher is listening for read events (EV_READ).
Definition io.h:1037
Template wrapper for concrete event handlers and their associated libev watchers.
Definition listener.h:76
Central event loop manager for asynchronous IO operations.
Definition listener.h:50
static thread_local listener current
Thread-local instance of the listener.
Definition listener.h:61
CRTP base class for managing asynchronous output operations.
Definition io.h:755
void ready_to_write() noexcept
Ensures the I/O watcher is listening for write events (EV_WRITE).
Definition io.h:800
output()=default
Default constructor.
auto & publish(_Args &&...args) noexcept
Publishes data to the output buffer and ensures write readiness.
Definition io.h:815
void disconnect(int reason=1)
Initiates a graceful disconnection of the output component.
Definition io.h:843
void start() noexcept
Starts asynchronous output operations.
Definition io.h:787
void dispose()
Disposes of resources and finalizes disconnection for the output component.
Definition io.h:907
~output()=default
Destructor.
output(output const &)=delete
Deleted copy constructor to prevent unintended copying of I/O state and resources.
output< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:761
auto & operator<<(T &&data)
Stream operator for publishing data, equivalent to publish(std::forward<T>(data)).
Definition io.h:831
static constexpr const bool has_server
Indicates this component is not inherently a server.
Definition io.h:762
CRTP base class that adds timeout functionality to derived asynchronous components.
Definition io.h:96
void updateTimeout() noexcept
Updates the last activity timestamp to the current event loop time.
Definition io.h:120
with_timeout(ev_tstamp timeout=3)
Constructor that initializes the timeout.
Definition io.h:106
auto getTimeout() const noexcept
Gets the current configured timeout value.
Definition io.h:146
void setTimeout(ev_tstamp timeout) noexcept
Sets a new timeout value and restarts the timer.
Definition io.h:131
void callback(_Func &&func, double timeout=0.)
Utility function to schedule a callable for execution after a timeout.
Definition io.h:233
Event event
Alias for the base Event class.
Definition Event.h:385
bool unlikely(bool expr)
Hint for branch prediction when a condition is expected to be false.
Definition branch_hints.h:76
bool likely(bool expr)
Hint for branch prediction when a condition is expected to be true.
Definition branch_hints.h:49
Protocol interfaces for message processing in the asynchronous IO framework.
Advanced type traits and metaprogramming utilities for the QB Framework.
#define CREATE_MEMBER_CHECK(member)
Macro to create a type trait to check for any member with a given name.
Definition type_traits.h:669
#define GENERATE_HAS_METHOD(method)
Macro to generate a type trait to check for a method with a specific signature.
Definition type_traits.h:804