qb  2.0.0.0
C++17 Actor Framework
qb Issue Watch Star Fork Follow @isndev
Loading...
Searching...
No Matches
connector.h
Go to the documentation of this file.
1
24
25#ifndef QB_IO_ASYNC_TCP_CONNECTOR_H
26#define QB_IO_ASYNC_TCP_CONNECTOR_H
27
28#include <qb/io.h>
29#include <qb/io/system/sys__socket.h>
30#include "../../uri.h"
31#include "../event/io.h"
32#include "../listener.h"
33
34namespace qb::io::async::tcp {
35
48template <typename Socket_, typename Func_>
49class connector {
50 Func_ _func;
51 const double _timeout;
52 Socket_ _socket;
53 uri _remote;
54
55private:
62 void _establish_connection() {
63 LOG_DEBUG("Started async connect to " << _remote.source());
64 auto ret = _socket.n_connect(_remote);
65 if (!ret) {
66 LOG_DEBUG("Connected directly to " << _remote.source());
67 _func(std::move(_socket));
68 } else if (socket_no_error(qb::io::socket::get_last_errno())) {
70 .registerEvent<event::io>(*this, _socket.native_handle(), EV_WRITE)
71 .start();
72 return;
73 } else {
74 _socket.disconnect();
75 LOG_DEBUG("Failed to connect to "
76 << _remote.source() << " err=" << qb::io::socket::get_last_errno());
77 _func(Socket_{});
78 }
79 delete this;
80 }
81
82public:
95 connector(uri const &remote, Func_ &&func, double timeout = 0.)
96 : _func(std::forward<Func_>(func))
97 , _timeout(timeout > 0. ? ev_time() + timeout : 0.)
98 , _remote{remote} {
99 LOG_DEBUG("Connector: Initializing for " << remote.source());
100 // _socket is default-initialized here
101 _establish_connection();
102 }
103
118 connector(Socket_&& existing_socket, uri const &remote, Func_ &&func, double timeout = 0.)
119 : _func(std::forward<Func_>(func))
120 , _timeout(timeout > 0. ? ev_time() + timeout : 0.)
121 , _socket(std::move(existing_socket)) // Move the existing socket
122 , _remote{remote} {
123 LOG_DEBUG("Connector: Initializing with existing socket for " << remote.source());
124 _establish_connection();
125 }
126
137 void
138 on(event::io const &event) {
139 int err = 0;
140 if (!(event._revents & EV_WRITE) ||
141 _socket.template get_optval<int>(SOL_SOCKET, SO_ERROR, err)) {
142 _socket.disconnect();
143 err = 1;
144 } else if (err && (err != EISCONN) && (!_timeout || ev_time() < _timeout))
145 return;
146 listener::current.unregisterEvent(event._interface);
147 if (!err || err == EISCONN) {
148 LOG_DEBUG("Connected async to " << _remote.source());
149 _socket.connected();
150 _func(std::move(_socket));
151 } else {
152 LOG_DEBUG("Failed to connect to " << _remote.source() << " err="
153 << qb::io::socket::get_last_errno());
154 _func(Socket_{});
155 }
156 delete this;
157 }
158};
159
174template <typename Socket_, typename Func_>
175void
176connect(uri const &remote, Func_ &&func, double timeout = 0.) {
177 new connector<Socket_, Func_>(remote, std::forward<Func_>(func), timeout);
178}
179
196template <typename Socket_, typename Func_>
197void
198connect(Socket_&& existing_socket, uri const &remote, Func_ &&func, double timeout = 0.) {
199 new connector<Socket_, Func_>(std::move(existing_socket), remote, std::forward<Func_>(func), timeout);
200}
201
202} // namespace qb::io::async::tcp
203
204#endif // QB_IO_ASYNC_TCP_CONNECTOR_H
Core event loop manager for the asynchronous IO framework.
static thread_local listener current
Thread-local instance of the listener.
Definition listener.h:61
Handles asynchronous TCP connection establishment.
Definition connector.h:49
connector(uri const &remote, Func_ &&func, double timeout=0.)
Constructor.
Definition connector.h:95
connector(Socket_ &&existing_socket, uri const &remote, Func_ &&func, double timeout=0.)
Constructor with an existing socket.
Definition connector.h:118
void on(event::io const &event)
I/O event handler.
Definition connector.h:138
Class for parsing, manipulating, and representing URIs.
Definition uri.h:181
const auto & source() const
Returns the source string of this URI.
Definition uri.h:416
void connect(uri const &remote, Func_ &&func, double timeout=0.)
Initiates an asynchronous TCP connection.
Definition connector.h:176
Low-level I/O notification event for asynchronous operations.
Core I/O and logging utilities for the qb framework.
#define LOG_DEBUG(X)
Debug-level log macro (no-op if QB_STDOUT_LOG is not defined)
Definition io.h:215
URI parsing and manipulation utilities.