29#ifndef QB_IO_ASYNC_IO_H
30#define QB_IO_ASYNC_IO_H
40#define Derived static_cast<_Derived &>(*this)
42namespace qb::io::async {
57template <
typename _Derived,
typename _EV_EVENT>
95template <
typename _Derived>
98 ev_tstamp _last_activity;
108 , _last_activity(0) {
134 _last_activity = ev_time();
162 on(event::timer &event) noexcept {
163 const ev_tstamp after = _last_activity -
event.loop.now() + _timeout;
168 this->_async_event.set(after);
169 this->_async_event.start();
185template <
typename _Func>
199 , _func(std::forward<_Func>(func)) {
213 on(event::timer
const & )
const {
231template <
typename _Func>
237template <
typename _Func,
typename Rep,
typename Period>
238void callback(_Func&& func, std::chrono::duration<Rep, Period> timeout_duration) {
239 double seconds = std::chrono::duration_cast<std::chrono::duration<double>>(timeout_duration).count();
240 callback(std::forward<_Func>(func), seconds);
255template <
typename _Derived>
259 std::vector<IProtocol *> _protocol_list;
280 : _protocol(protocol) {}
293 for (
auto protocol_ptr : _protocol_list)
308 template <
typename _Protocol,
typename... _Args>
311 auto new_protocol =
new _Protocol(std::forward<_Args>(args)...);
312 if (new_protocol->ok()) {
313 _protocol = new_protocol;
314 _protocol_list.push_back(new_protocol);
329 start(std::string
const &fpath, ev_tstamp ts = 0.1) noexcept {
357 constexpr const auto invalid_ret =
static_cast<std::size_t
>(-1);
358 std::size_t ret = 0u;
360 ret =
static_cast<std::size_t
>(Derived.read());
369 if constexpr (has_method_on<_Derived, void, event::pending_read>::value ||
370 has_method_on<_Derived, void, event::eof>::value) {
371 const auto pendingRead = Derived.pendingRead();
373 if constexpr (has_method_on<_Derived, void,
374 event::pending_read>::value) {
375 Derived.on(event::pending_read{pendingRead});
378 if constexpr (has_method_on<_Derived, void, event::eof>::value) {
379 Derived.on(event::eof{});
403 on(event::file const &event) {
407 if constexpr (has_method_on<_Derived, void, event::file>::value) {
411 auto diff_read =
event.attr.st_size -
event.prev.st_size;
412 if (!_protocol->
ok() || !
event.attr.st_nlink ||
413 (diff_read < 0 && lseek(Derived.transport().native_handle(), 0, SEEK_SET)))
415 else if (diff_read) {
416 if constexpr (_Derived::do_read) {
422 this->_async_event.stop();
439template <
typename _Derived>
465 start(std::string
const &fpath, ev_tstamp ts = 0.1) noexcept {
490 on(event::file const &event) {
494 if constexpr (has_method_on<_Derived, void, event::file>::value) {
516template <
typename _Derived>
520 std::vector<IProtocol *> _protocol_list;
521 bool _on_message =
false;
522 bool _is_disposed =
false;
573 template <
typename _Protocol,
typename... _Args>
576 auto new_protocol =
new _Protocol(std::forward<_Args>(args)...);
577 if (new_protocol->ok()) {
578 _protocol = new_protocol;
579 _protocol_list.push_back(new_protocol);
593 for (
auto protocol : _protocol_list)
595 _protocol_list.clear();
617 Derived.transport().set_nonblocking(
true);
618 this->
_async_event.start(Derived.transport().native_handle(), EV_READ);
630 this->
_async_event.start(Derived.transport().native_handle(), EV_READ);
668 on(event::io const &event) {
669 constexpr const auto invalid_ret =
static_cast<std::size_t
>(-1);
670 std::size_t ret = 0u;
674 if (_reason || !_protocol->
ok())
677 if (
likely(event._revents & EV_READ)) {
678 ret =
static_cast<std::size_t
>(Derived.read());
683 auto protocol = this->_protocol;
685 if (protocol->should_flush())
690 if constexpr (has_method_on<_Derived, void, event::pending_read>::value ||
691 has_method_on<_Derived, void, event::eof>::value) {
692 const auto pendingRead = Derived.pendingRead();
694 if constexpr (has_method_on<_Derived, void,
695 event::pending_read>::value) {
696 Derived.on(event::pending_read{pendingRead});
699 if constexpr (has_method_on<_Derived, void, event::eof>::value) {
700 Derived.on(event::eof{});
708 if (socket::get_last_errno() == 10035)
730 if constexpr (has_method_on<_Derived, void, event::disconnected>::value) {
731 Derived.on(event::disconnected{_reason});
734 if constexpr (_Derived::has_server) {
735 Derived.server().disconnected(Derived.id());
736 }
else if constexpr (has_method_on<_Derived, void, event::dispose>::value) {
737 Derived.on(event::dispose{});
754template <
typename _Derived>
757 bool _is_disposed =
false;
789 Derived.transport().set_nonblocking(
true);
790 this->
_async_event.start(Derived.transport().native_handle(), EV_WRITE);
813 template <
typename... _Args>
817 if constexpr (
sizeof...(_Args))
818 (Derived.out() << ... << std::forward<_Args>(args));
819 return Derived.out();
829 template <
typename T>
832 return publish(std::forward<T>(data));
867 on(event::io const &event) {
873 if (
likely(event._revents & EV_WRITE)) {
874 ret = Derived.write();
877 if (!Derived.pendingWrite()) {
878 this->_async_event.set(EV_NONE);
879 if constexpr (has_method_on<_Derived, void, event::eos>::value) {
880 Derived.on(event::eos{});
882 }
else if constexpr (has_method_on<_Derived, void,
883 event::pending_write>::value) {
884 Derived.on(event::pending_write{Derived.pendingWrite()});
890 if (socket::get_last_errno() == 10035)
912 if constexpr (has_method_on<_Derived, void, event::disconnected>::value) {
913 Derived.on(event::disconnected{_reason});
916 if constexpr (_Derived::has_server) {
917 Derived.server().disconnected(Derived.id());
918 }
else if constexpr (has_method_on<_Derived, void, event::dispose>::value) {
919 Derived.on(event::dispose{});
937template <
typename _Derived>
938class io :
public base<io<_Derived>, event::io> {
941 std::vector<IProtocol *> _protocol_list;
942 bool _on_message =
false;
943 bool _is_disposed =
false;
986 template <
typename _Protocol,
typename... _Args>
989 auto new_protocol =
new _Protocol(std::forward<_Args>(args)...);
990 if (new_protocol->ok()) {
991 _protocol = new_protocol;
992 _protocol_list.push_back(new_protocol);
1005 for (
auto protocol : _protocol_list)
1007 _protocol_list.clear();
1008 _protocol =
nullptr;
1028 Derived.transport().set_nonblocking(
true);
1029 this->
_async_event.start(Derived.transport().native_handle(), EV_READ);
1062 _protocol->not_ok();
1071 template <
typename... _Args>
1075 if constexpr (
sizeof...(_Args))
1076 (Derived.out() << ... << std::forward<_Args>(args));
1077 return Derived.out();
1086 template <
typename T>
1089 return publish(std::forward<T>(data));
1130 on(event::io const &event) {
1131 constexpr const std::size_t invalid_ret =
static_cast<std::size_t
>(-1);
1132 std::size_t ret = 0u;
1139 if (event._revents & EV_READ && _protocol->
ok()) {
1140 ret =
static_cast<std::size_t
>(Derived.read());
1146 auto protocol = this->_protocol;
1148 if (protocol->should_flush())
1151 _on_message =
false;
1153 if constexpr (has_method_on<_Derived, void, event::pending_read>::value ||
1154 has_method_on<_Derived, void, event::eof>::value) {
1155 const auto pendingRead = Derived.pendingRead();
1157 if constexpr (has_method_on<_Derived, void,
1158 event::pending_read>::value) {
1159 Derived.on(event::pending_read{pendingRead});
1162 if constexpr (has_method_on<_Derived, void, event::eof>::value) {
1163 Derived.on(event::eof{});
1169 if (
event._revents & EV_WRITE) {
1170 ret =
static_cast<std::size_t
>(Derived.write());
1173 if (!Derived.pendingWrite()) {
1174 if (!_protocol->
ok())
1176 this->_async_event.set(EV_READ);
1177 if constexpr (has_method_on<_Derived, void, event::eos>::value) {
1178 Derived.on(event::eos{});
1180 }
else if constexpr (has_method_on<_Derived, void,
1181 event::pending_write>::value) {
1182 Derived.on(event::pending_write{Derived.pendingWrite()});
1190 if (socket::get_last_errno() == 10035)
1208 _is_disposed =
true;
1210 if constexpr (has_method_on<_Derived, void, event::disconnected>::value) {
1211 Derived.on(event::disconnected{_reason});
1214 if constexpr (_Derived::has_server) {
1215 Derived.server().disconnected(Derived.id());
1218 if constexpr (has_method_on<_Derived, void, event::dispose>::value)
1219 Derived.on(event::dispose{});
Aggregation of all event types for the asynchronous I/O system.
Core event loop manager for the asynchronous IO framework.
Base interface for all message processing protocols.
Definition protocol.h:43
virtual void onMessage(std::size_t size) noexcept=0
Processes a complete message that has been identified in the input buffer.
virtual std::size_t getMessageSize() noexcept=0
Determines the size of the next complete message in the input buffer.
bool ok() const noexcept
Checks if the protocol is in a valid operational state.
Definition protocol.h:97
Utility class to execute a function after a specified timeout using the event loop.
Definition io.h:186
void on(event::timer const &) const
Timer event handler called when the timeout expires.
Definition io.h:213
Timeout(_Func &&func, double timeout=0.)
Constructor that schedules a function to be called after a timeout.
Definition io.h:197
~base()
Destructor that unregisters the event watcher.
Definition io.h:76
base()
Constructor that registers the event watcher with the current listener.
Definition io.h:67
event::file & _async_event
Definition io.h:60
CRTP base class for watching a directory for attribute changes.
Definition io.h:440
~directory_watcher()=default
Destructor.
directory_watcher< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:444
void disconnect() noexcept
Stops watching the directory.
Definition io.h:474
directory_watcher()=default
Default constructor.
void start(std::string const &fpath, ev_tstamp ts=0.1) noexcept
Starts watching a directory for attribute changes.
Definition io.h:465
static constexpr const bool do_read
Flag indicating this watcher type does not read directory content directly.
Definition io.h:445
CRTP base class for watching a single file for attribute changes and processing its contents.
Definition io.h:256
int read_all()
Reads all available data from the file and processes it using the current protocol.
Definition io.h:356
void start(std::string const &fpath, ev_tstamp ts=0.1) noexcept
Starts watching a file for attribute changes.
Definition io.h:329
file_watcher()=default
Default constructor.
file_watcher(IProtocol *protocol) noexcept
Constructor with an externally managed protocol.
Definition io.h:279
file_watcher< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:262
void disconnect() noexcept
Stops watching the file.
Definition io.h:339
file_watcher(file_watcher const &)=delete
Deleted copy constructor to prevent unintended copying of watcher state and resources.
_Protocol * switch_protocol(_Args &&...args)
Switches to a new protocol for processing file contents, taking ownership.
Definition io.h:310
static constexpr const bool do_read
Flag indicating this watcher type reads file content.
Definition io.h:263
~file_watcher() noexcept
Destructor.
Definition io.h:292
Asynchronous file operations handler.
Definition file.h:46
CRTP base class for managing bidirectional asynchronous I/O operations with protocol processing.
Definition io.h:938
_Protocol * switch_protocol(_Args &&...args)
Switches to a new protocol for I/O processing, taking ownership.
Definition io.h:988
io()=default
Default constructor.
auto & operator<<(T &&data)
Stream operator for publishing data.
Definition io.h:1088
void start() noexcept
Starts bidirectional asynchronous I/O operations.
Definition io.h:1026
io(IProtocol *protocol) noexcept
Constructor with an initial protocol instance.
Definition io.h:961
io(io const &)=delete
Deleted copy constructor.
void ready_to_write() noexcept
Ensures the I/O watcher is listening for write events (EV_WRITE).
Definition io.h:1048
void clear_protocols()
Clears all owned protocol instances.
Definition io.h:1004
static constexpr const bool has_server
Indicates this component is not inherently a server.
Definition io.h:948
auto & publish(_Args &&...args) noexcept
Publishes data to the output buffer and ensures write readiness.
Definition io.h:1073
~io() noexcept
Destructor.
Definition io.h:975
void dispose()
Disposes of resources and finalizes disconnection for the I/O component.
Definition io.h:1205
void disconnect(int reason=1)
Initiates a graceful disconnection.
Definition io.h:1098
IProtocol * protocol()
Gets a pointer to the current active protocol instance.
Definition io.h:1016
io< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:947
void close_after_deliver() const noexcept
Requests connection closure after all pending output data is delivered.
Definition io.h:1061
void ready_to_read() noexcept
Ensures the I/O watcher is listening for read events (EV_READ).
Definition io.h:1037
Template wrapper for concrete event handlers and their associated libev watchers.
Definition listener.h:76
Central event loop manager for asynchronous IO operations.
Definition listener.h:50
static thread_local listener current
Thread-local instance of the listener.
Definition listener.h:61
CRTP base class for managing asynchronous output operations.
Definition io.h:755
void ready_to_write() noexcept
Ensures the I/O watcher is listening for write events (EV_WRITE).
Definition io.h:800
output()=default
Default constructor.
auto & publish(_Args &&...args) noexcept
Publishes data to the output buffer and ensures write readiness.
Definition io.h:815
void disconnect(int reason=1)
Initiates a graceful disconnection of the output component.
Definition io.h:843
void start() noexcept
Starts asynchronous output operations.
Definition io.h:787
void dispose()
Disposes of resources and finalizes disconnection for the output component.
Definition io.h:907
~output()=default
Destructor.
output(output const &)=delete
Deleted copy constructor to prevent unintended copying of I/O state and resources.
output< _Derived > base_io_t
Base I/O type alias for CRTP.
Definition io.h:761
auto & operator<<(T &&data)
Stream operator for publishing data, equivalent to publish(std::forward<T>(data)).
Definition io.h:831
static constexpr const bool has_server
Indicates this component is not inherently a server.
Definition io.h:762
CRTP base class that adds timeout functionality to derived asynchronous components.
Definition io.h:96
void updateTimeout() noexcept
Updates the last activity timestamp to the current event loop time.
Definition io.h:120
with_timeout(ev_tstamp timeout=3)
Constructor that initializes the timeout.
Definition io.h:106
auto getTimeout() const noexcept
Gets the current configured timeout value.
Definition io.h:146
void setTimeout(ev_tstamp timeout) noexcept
Sets a new timeout value and restarts the timer.
Definition io.h:131
void callback(_Func &&func, double timeout=0.)
Utility function to schedule a callable for execution after a timeout.
Definition io.h:233
Event event
Alias for the base Event class.
Definition Event.h:385
bool unlikely(bool expr)
Hint for branch prediction when a condition is expected to be false.
Definition branch_hints.h:76
bool likely(bool expr)
Hint for branch prediction when a condition is expected to be true.
Definition branch_hints.h:49
Protocol interfaces for message processing in the asynchronous IO framework.
Advanced type traits and metaprogramming utilities for the QB Framework.
#define CREATE_MEMBER_CHECK(member)
Macro to create a type trait to check for any member with a given name.
Definition type_traits.h:669
#define GENERATE_HAS_METHOD(method)
Macro to generate a type trait to check for a method with a specific signature.
Definition type_traits.h:804