Guide to using qb-io asynchronous features like callbacks and timers directly within QB actors for non-blocking behavior.
Integrating Core & IO: Asynchronous Operations within Actors
Actors in the QB Framework can directly and seamlessly leverage the asynchronous capabilities of qb-io. This allows actors to perform time-based operations, schedule deferred tasks, or manage I/O-bound activities without blocking their VirtualCore and impeding the progress of other actors on the same core.
(qb/io/async/io.h)
This is the primary and most flexible way for an actor to schedule a piece of code (typically a lambda) to be executed later by its own VirtualCore's event loop.
- How to Use: Call qb::io::async::callback(your_lambda, delay_in_seconds); from within an actor's onInit(), an on(Event&) handler, or even another onCallback() (if the actor inherits qb::ICallback).
- A delay_in_seconds of 0.0 (or omitting it) schedules the lambda for the next available iteration of the event loop.
- Execution Context: The provided lambda will execute on the same VirtualCore (and thus the same thread) as the actor that scheduled it. This means the lambda can safely interact with the actor's state if and only if the actor is still alive and proper checks are made.
- Capturing Actor Context (this or id()):
- Crucial Safety Check: When a lambda captures this (or auto self_id = id();), it is essential to check this->is_alive() (or if (auto actor_ptr = VirtualCore::_handler->getActor(self_id)) { ... } if only ID was captured and direct actor pointer needed, though this is less common for self-interaction) at the very beginning of the lambda's body. This is because the actor might be terminated and destroyed before the delayed callback gets a chance to execute.
- Sending Events Back to Self: Instead of complex direct state manipulation from the lambda, a robust pattern is for the lambda to push a new, specific event back to the scheduling actor itself. The actor then handles this event in a dedicated on() handler.
- Common Use Cases for Actors:
- Implementing Timeouts: For an operation initiated by the actor, schedule a callback. If the operation doesn't complete (e.g., by an incoming response event clearing a pending flag) before the callback runs, the callback can trigger timeout logic.
- Periodic Actions: A callback can reschedule itself to create periodic behavior (an alternative to using qb::ICallback).
- Breaking Down Complex Non-Blocking Tasks: If an actor needs to perform a series of non-blocking steps, it can execute one step and then schedule the next step via async::callback with a zero delay, effectively yielding control back to the event loop between steps.
- Retry Mechanisms: After a failed attempt (e.g., network connection), schedule a retry attempt using a delayed callback.
- Simulating Work/Delays: Useful in examples or tests to introduce artificial processing times without actual blocking.
struct ProcessTaskCompleteEvent :
qb::Event {
int task_id;
bool timed_out;
explicit ProcessTaskCompleteEvent(int id, bool timeout_status)
: task_id(id), timed_out(timeout_status) {}
};
private:
std::map<int, bool> _pending_tasks;
int _next_task_id = 1;
public:
startLongOperation(5.0);
return true;
}
void startLongOperation(double timeout_seconds) {
int current_task_id = _next_task_id++;
_pending_tasks[current_task_id] = true;
qb::io::cout() <<
"Actor [" <<
id() <<
"]: Starting operation for task " << current_task_id
<< ", timeout in " << timeout_seconds << "s.\n";
return;
}
if (_pending_tasks.count(current_task_id) && _pending_tasks[current_task_id]) {
qb::io::cout() <<
"Actor [" <<
id() <<
"]: Task " << current_task_id <<
" timed out!\n";
_pending_tasks.erase(current_task_id);
}
}, timeout_seconds);
}
void onOperationActuallyCompleted(int task_id_completed) {
if (_pending_tasks.count(task_id_completed) && _pending_tasks[task_id_completed]) {
qb::io::cout() <<
"Actor [" <<
id() <<
"]: Task " << task_id_completed <<
" completed successfully (before timeout).\n";
_pending_tasks.erase(task_id_completed);
}
}
void on(
const ProcessTaskCompleteEvent& event) {
qb::io::cout() <<
"Actor [" <<
id() <<
"]: Handling ProcessTaskCompleteEvent for task " <<
event.task_id
<< (event.timed_out ? " (TIMED OUT)" : " (SUCCESS)") << ".\n";
}
void on(
const qb::KillEvent& ) {
kill(); }
};
Convenience header for the core QB Actor components.
Main include file for the QB asynchronous I/O library.
Base class for all actors in the qb framework.
Definition Actor.h:106
void registerEvent(_Actor &actor) const noexcept
Subscribe this actor to listen for a specific event type.
void on(KillEvent const &event) noexcept
Handler for KillEvent.
_Event & push(ActorId const &dest, _Args &&...args) const noexcept
Send a new event in an ordered fashion to a destination actor, returning a reference to it.
ActorId id() const noexcept
Get ActorId.
Definition Actor.h:368
void kill() const noexcept
Terminate this actor and mark it for removal from the system.
bool is_alive() const noexcept
Check if Actor is alive and processing events.
virtual bool onInit()
Initialization callback, called once after construction and ID assignment.
Definition Actor.h:198
Base class for all events in the actor system.
Definition Event.h:85
void callback(_Func &&func, double timeout=0.)
Utility function to schedule a callable for execution after a timeout.
Definition io.h:233
Core I/O and logging utilities for the qb framework.
(Reference: test-actor-delayed-events.cpp, example10_distributed_computing.cpp (extensive use for simulation and scheduling), file_processor/file_worker.h (wrapping blocking calls).**)
While async::callback is versatile, if an actor needs a simple inactivity timeout (i.e., it should terminate or take action if no relevant messages arrive for a period), inheriting from qb::io::async::with_timeout<DerivedActor> can be an option.
- How it Works:
- Inheritance: class MyActor : public qb::Actor, public qb::io::async::with_timeout<MyActor> { ... };
- Constructor Initialization: Call the base with_timeout(timeout_duration_seconds); in your actor's constructor.
- Implement Timeout Handler: void on(qb::io::async::event::timer const& event) will be called when the timer expires.
- Reset on Activity: In your relevant on(ApplicationEvent&) handlers (those that signify activity), call this->updateTimeout(); to reset the inactivity countdown.
- Use Case: Terminating idle sessions, cleaning up actors that haven't received heartbeats or work for a while.
class SessionActorWithInactivity :
public qb::Actor,
public:
SessionActorWithInactivity() : with_timeout(30.0) {
qb::io::cout() <<
"SessionActor [" <<
id() <<
"]: Created with 30s inactivity timeout.\n";
}
return true;
}
void on(
const ClientActivityPing& ) {
qb::io::cout() <<
"SessionActor [" <<
id() <<
"]: Ping received, activity confirmed. Resetting timeout.\n";
}
void on(qb::io::async::event::timer
const& ) {
qb::io::cout() <<
"SessionActor [" <<
id() <<
"]: Inactivity timeout reached! Terminating session.\n";
}
void on(
const qb::KillEvent& ) {
}
~SessionActorWithInactivity() override {
qb::io::cout() <<
"SessionActor [" <<
id() <<
"] destroyed.\n";
}
};
CRTP base class that adds timeout functionality to derived asynchronous components.
Definition io.h:96
void updateTimeout() noexcept
Updates the last activity timestamp to the current event loop time.
Definition io.h:120
void setTimeout(ev_tstamp timeout) noexcept
Sets a new timeout value and restarts the timer.
Definition io.h:131
Thread-safe console output class.
Definition io.h:100
(Reference: chat_tcp/server/ChatSession.h uses this pattern, though it's a non-actor class in that example, the principle is the same. test-async-io.cpp::TimerHandler also demonstrates with_timeout.)**
Asynchronous File Operations within Actors
Directly performing blocking file I/O (like std::fstream::read or qb::io::sys::file::write) within an actor's event handler or onCallback is highly discouraged as it will block the VirtualCore.
To handle file operations asynchronously from an actor:
- Wrap Blocking Calls in qb::io::async::callback (for simpler cases):
- The actor receives a request to read/write a file.
- It captures necessary parameters (file path, data, original requester ID) into a lambda.
- It schedules this lambda using qb::io::async::callback.
- The lambda executes on the actor's VirtualCore but in a way that doesn't block the primary event processing flow for other events while the blocking call itself is in progress (though the callback itself will block its turn in the event loop queue).
- After the blocking I/O in the lambda completes, the lambda should push a result event (containing data or error status) back to the original requester or to the scheduling actor itself.
- Caution: While this makes the actor appear non-blocking to other actors, the VirtualCore processing this specific callback will be blocked during the actual file I/O. This is suitable for infrequent or less performance-critical file operations.
- Dedicated File Worker Actors (Recommended for complex or frequent I/O):
- Create specialized worker actors whose sole purpose is to perform file I/O.
- The main actor delegates file operation requests (as events) to these worker actors.
- Worker actors can use the async::callback pattern internally to perform the blocking I/O.
- This isolates blocking operations to specific actors/cores, potentially configured with higher latency if they are expected to block often.
- (Reference: The example/core_io/file_processor/ provides a good example of this pattern.)**
- Asynchronous File Watching (qb::io::async::file_watcher or directory_watcher):
- For scenarios where an actor needs to react to changes in the file system (file creation, modification, deletion) without polling.
- The actor (or a helper class it owns) can inherit from qb::io::async::file_watcher<MyActor> or directory_watcher<MyActor>.
- It then implements void on(qb::io::async::event::file const& event) to handle notifications.
- The watcher is started with this->start(path_to_watch, interval);.
- (Reference: The example/core_io/file_monitor/ demonstrates this pattern with a DirectoryWatcher actor.)**
By using these asynchronous patterns, actors can effectively manage time-based logic and interact with potentially blocking resources like the file system without compromising the overall responsiveness and concurrency of the QB application.
(Next: Integrating Core & IO: Network Actors to see how actors handle network I/O.**)