qb
2.0.0.0
C++17 Actor Framework
|
Learn how qb-io uses protocols to define message boundaries and parse data from byte streams.
Network transports like TCP deliver a continuous stream of bytes. To make sense of this stream, applications need a way to identify where one logical message ends and the next begins—a process called message framing. In qb-io, this is the job of protocols.
Protocols define the rules for interpreting raw byte streams, enabling qb-io components to extract meaningful application-level messages.
All custom and built-in protocols in qb-io are built upon the qb::io::async::AProtocol<IO_Type> abstract base class. This class uses the Curiously Recurring Template Pattern (CRTP), where IO_Type is the specific I/O component (e.g., your TCP session class) that will use the protocol.
Key Responsibilities & Methods to Implement in a Custom Protocol:
qb-io provides several pre-built protocols for common framing strategies, saving you from reinventing the wheel. These are found in qb/io/protocol/base.h, qb/io/protocol/text.h, and qb/io/protocol/json.h.
(From qb/io/protocol/base.h and qb/io/protocol/text.h)
(From qb/io/protocol/base.h and qb/io/protocol/text.h)
(From qb/io/protocol/json.h)
Protocols are typically integrated into classes that handle asynchronous I/O, often those derived using the qb::io::use<> helper template.
Declare the Protocol Type: Inside your I/O component class (e.g., MyTCPSession inheriting from qb::io::use<MyTCPSession>::tcp::client), define your chosen protocol as a nested type alias named Protocol. ```cpp #include <qb/io/async.h> // For qb::io::use<> #include <qb/io/protocol/text.h> // For text::command #include <iostream>
class MyTCPSession : public qb::io::use<MyTCPSession>::tcp::client</*optional ServerType*/> { public: // *** 1. Declare the protocol to be used by this session *** using Protocol = qb::protocol::text::command<MyTCPSession>;
explicit MyTCPSession(/* constructor args */) /* : client(args) */ { // The base qb::io::use<...>::tcp::client will often automatically instantiate // your declared Protocol if it has a constructor taking MyTCPSession&. // If not, or for more control, you might call: // this->switch_protocol<Protocol>(*this); }
// ... other methods ... }; ```
Implement the Message Handler: Your I/O component class must implement a public method void on(Protocol::message&& msg) (or void on(const Protocol::message& msg)) to receive and process fully parsed messages from the protocol. ```cpp // Continuing MyTCPSession from above public: // *** 2. Implement the handler for messages parsed by your Protocol *** void on(Protocol::message&& received_command) { // text::command::message is {size, data, text} std::cout << "Received command: " << received_command.text << std::endl; if (received_command.text == "QUIT") { this->disconnect(); } // Process the command... }
void on(qb::io::async::event::disconnected const& event) { std::cout << "Disconnected. Reason: " << event.reason << std::endl; } }; ```
Sending Data According to Protocol: When sending data, ensure it conforms to the chosen protocol's framing rules. ```cpp // Inside MyTCPSession or another class interacting with it void sendCommandToServer(MyTCPSession& session, const std::string& command_text) { // For text::command, messages are newline-terminated. // Protocol::end is typically defined by byte_terminated based protocols. session << command_text << MyTCPSession::Protocol::end; }
// For a binary protocol (e.g., text::binary16) // void sendBinaryData(MyBinarySession& session, const char* data, uint16_t len) { // auto header = MyBinarySession::Protocol::Header(len); // Get network-byte-order header // session.publish(reinterpret_cast<const char*>(&header), sizeof(header)); // session.publish(data, len); // } ```
(Reference examples: test-session-text.cpp, test-session-json.cpp, chat_tcp/shared/Protocol.h, message_broker/shared/Protocol.h**)
When the built-in protocols don't fit your application's specific message structure, qb-io makes it straightforward to define your own. Here's how:
Define Your Message Structure(s): First, determine what constitutes a "message" in your protocol. This might involve a header part and a payload part. Define C++ structs or classes for these.
```cpp // In a suitable header, e.g., my_custom_protocol_messages.h namespace my_app { // Example: A header structure for your messages struct MessageHeader { uint32_t magic_number; // To identify your protocol uint16_t message_type; // To differentiate kinds of messages uint32_t payload_length; // Add other fixed-size header fields as needed (checksum, sequence no, etc.) }; constexpr uint32_t MY_PROTOCOL_MAGIC = 0xABCD1234;
// Example: The structure your IO component will receive struct ParsedMessage { MessageHeader header; std::vector<char> payload; // Or std::string, or a shared_ptr to a buffer // Add other parsed fields if your payload has structure }; } // namespace my_app ```
Create Your Protocol Class: Inherit from qb::io::async::AProtocol<YourIOComponent> and implement the required methods. YourIOComponent is the class (e.g., your TCP session handler) that will use this protocol.
```cpp // In your protocol header, e.g., my_custom_protocol.h #include <qb/io/async/protocol.h> #include "my_custom_protocol_messages.h" // Your message structs #include <cstring> // For std::memcpy #include <vector> // For std::vector in ParsedMessage
// Forward declare your IO component if necessary, or include its header // class YourIOComponent;
template<typename YourIOComponent> class MyCustomProtocol : public qb::io::async::AProtocol<YourIOComponent> { private: // Internal state for parsing my_app::MessageHeader _current_header; bool _reading_header = true; static constexpr size_t HEADER_SIZE = sizeof(my_app::MessageHeader);
public: // *** This is crucial: Define what your IO_Type::on() will receive *** using message = my_app::ParsedMessage;
// Constructor: Takes a reference to the IO component that owns it explicit MyCustomProtocol(YourIOComponent& io_component) noexcept : qb::io::async::AProtocol<YourIOComponent>(io_component) {}
// — Implementation of AProtocol virtual methods —
std::size_t getMessageSize() noexcept override { auto& input_buffer = this->_io.in(); // Access the IO component's input buffer
if (_reading_header) { if (input_buffer.size() < HEADER_SIZE) { return 0; // Not enough data for the header yet } // Copy header data from buffer std::memcpy(&_current_header, input_buffer.cbegin(), HEADER_SIZE);
// Basic validation (e.g., magic number) if (_current_header.magic_number != my_app::MY_PROTOCOL_MAGIC) { //qb::io::cerr() << "Invalid magic number!" << std::endl; // Protocol error: How to handle? // Option 1: Mark protocol as bad, IO component should disconnect. this->not_ok(); // Mark protocol as not okay // Option 2: Try to find next magic number (resynchronize) - more complex. // For now, let's assume we disconnect on error. return 0; // Or indicate an error that leads to flushing this data } _reading_header = false; }
// Now we have the header, check if the full payload has arrived size_t total_message_size = HEADER_SIZE + _current_header.payload_length; if (input_buffer.size() < total_message_size) { return 0; // Not enough data for the full payload yet }
// A complete message (header + payload) is available return total_message_size; }
void onMessage(std::size_t total_message_size) noexcept override { auto& input_buffer = this->_io.in();
// Construct the message object to pass to the IO component's handler my_app::ParsedMessage received_msg; received_msg.header = _current_header; // Header was already read in getMessageSize
if (_current_header.payload_length > 0) { // Copy payload from the input buffer const char* payload_start = input_buffer.cbegin() + HEADER_SIZE; received_msg.payload.assign(payload_start, payload_start + _current_header.payload_length); }
// Dispatch the fully parsed message to the IO component // YourIOComponent must have a method: void on(my_app::ParsedMessage&& msg) this->_io.on(std::move(received_msg));
// Reset state for the next message reset(); }
void reset() noexcept override { _reading_header = true; // Clear any other partial parsing state _current_header = {}; } }; ```
Integrate with Your I/O Component: In your class that handles the I/O (e.g., MyTCPSession): ```cpp class MyTCPSession : public qb::io::use<MyTCPSession>::tcp::client</*...*/> { public: // Declare your custom protocol using Protocol = MyCustomProtocol<MyTCPSession>;
explicit MyTCPSession(/*...*/) { // Protocol is often instantiated by the use<> base if it has a constructor // taking YourIOComponent&. If not, you might need: this->switch_protocol<Protocol>(*this); }
// Implement the handler for your parsed message type void on(my_app::ParsedMessage&& msg) { // Process the structured message // qb::io::cout() << "Received message type: " << msg.header.message_type // << " with payload size: " << msg.header.payload_length << std::endl; } // ... other handlers (disconnected, etc.) }; ```
Manual Construction: ```cpp // Inside MyTCPSession or another class that sends void sendMyData(const std::vector<char>& payload_data, uint16_t type) { my_app::MessageHeader header; header.magic_number = my_app::MY_PROTOCOL_MAGIC; header.message_type = type; header.payload_length = static_cast<uint32_t>(payload_data.size()); // header.id = ...; // Set other header fields
// Send header this->publish(reinterpret_cast<const char*>(&header), sizeof(header)); // Send payload if (!payload_data.empty()) { this->publish(payload_data.data(), payload_data.size()); } } ``
Important Considerations for Custom Protocols:
(Reference Examples: The example/io/example5_custom_protocol.cpp provides a good starting point. For more advanced serialization and protocol design, examine the protocols in example/core_io/chat_tcp/shared/Protocol.h and example/core_io/message_broker/shared/Protocol.h.**)
By understanding and utilizing this protocol framework, you can build robust and flexible communication layers for your qb-io based applications.
(Next: QB-IO: Secure TCP (SSL/TLS) Transport or QB-IO: Utilities**)