qb  2.0.0.0
C++17 Actor Framework
qb Issue Watch Star Fork Follow @isndev
Loading...
Searching...
No Matches
stream.h
Go to the documentation of this file.
1
30
31#ifndef QB_IO_STREAM_H_
32#define QB_IO_STREAM_H_
35
36namespace qb::io {
37
48template <typename _IO_>
49class istream {
50public:
52 using transport_io_type = _IO_;
53
56
57protected:
58 _IO_ _in;
60
61public:
67 ~istream() noexcept {
68 close();
69 }
70
75 [[nodiscard]] _IO_ &
76 transport() noexcept {
77 return _in;
78 }
79
84 [[nodiscard]] const _IO_ &
85 transport() const noexcept {
86 return _in;
87 }
88
93 [[nodiscard]] input_buffer_type &
94 in() noexcept {
95 return _in_buffer;
96 }
97
102 [[nodiscard]] std::size_t
103 pendingRead() const noexcept {
104 return _in_buffer.size();
105 }
106
115 template <typename Available = void>
116 [[nodiscard]] int
117 read(std::enable_if_t<has_method_read<_IO_, int, char *, std::size_t>::value,
118 Available> * = nullptr) noexcept {
119 static constexpr const std::size_t bucket_read = 8192;
120 const auto ret = _in.read(_in_buffer.allocate_back(bucket_read), bucket_read);
121 if (ret >= 0)
122 _in_buffer.free_back(bucket_read - ret);
123 return ret;
124 }
125
133 void
134 flush(std::size_t size) noexcept {
135 _in_buffer.free_front(size);
136 }
137
143 void
144 eof() noexcept {
145 if (!_in_buffer.size())
146 _in_buffer.reset();
147 else
148 _in_buffer.reorder();
149 }
150
157 void
158 close() noexcept {
159 _in_buffer.reset();
160 if constexpr (has_member_func_disconnect<_IO_>::value)
161 _in.disconnect();
162 _in.close();
163 }
164};
165
176template <typename _IO_>
177class ostream {
178public:
180 using transport_io_type = _IO_;
181
184
185protected:
186 _IO_ _out;
188
189public:
195 ~ostream() noexcept {
196 close();
197 }
198
203 [[nodiscard]] _IO_ &
204 transport() noexcept {
205 return _out;
206 }
207
212 [[nodiscard]] const _IO_ &
213 transport() const noexcept {
214 return _out;
215 }
216
221 [[nodiscard]] output_buffer_type &
222 out() noexcept {
223 return _out_buffer;
224 }
225
230 [[nodiscard]] std::size_t
231 pendingWrite() const noexcept {
232 return _out_buffer.size();
233 }
234
243 template <typename Available = void>
244 [[nodiscard]] int
245 write(std::enable_if_t<has_method_write<_IO_, int, const char *, std::size_t>::value,
246 Available> * = nullptr) noexcept {
247 const auto ret = _out.write(_out_buffer.begin(), _out_buffer.size());
248
249 if (ret > 0) {
250 if (ret != _out_buffer.size()) {
251 _out_buffer.free_front(ret);
252 _out_buffer.reorder();
253 } else
254 _out_buffer.reset();
255 }
256
257 return ret;
258 }
259
269 char *
270 publish(char const *data, std::size_t size) noexcept {
271 return static_cast<char *>(
272 std::memcpy(_out_buffer.allocate_back(size), data, size));
273 }
274
281 void
282 close() noexcept {
283 _out_buffer.reset();
284 if constexpr (has_member_func_disconnect<_IO_>::value)
285 _out.disconnect();
286 _out.close();
287 }
288};
289
302template <typename _IO_>
303class stream : public istream<_IO_> {
304public:
306 using transport_io_type = _IO_;
307
310
317 constexpr static const bool has_reset_on_pending_read = false;
318
319protected:
321
322public:
327 [[nodiscard]] output_buffer_type &
328 out() noexcept {
329 return _out_buffer;
330 }
331
336 [[nodiscard]] std::size_t
337 pendingWrite() const noexcept {
338 return _out_buffer.size();
339 }
340
352 template <typename Available = void>
353 [[nodiscard]] int
354 write(std::enable_if_t<has_method_write<_IO_, int, const char *, std::size_t>::value,
355 Available> * = nullptr) noexcept {
356 const auto ret = this->_in.write(_out_buffer.begin(), _out_buffer.size());
357 if (ret > 0) {
358 if (static_cast<std::size_t>(ret) != _out_buffer.size()) {
359 _out_buffer.free_front(ret);
360 _out_buffer.reorder();
361 } else
362 _out_buffer.reset();
363 }
364 return ret;
365 }
366
376 char *
377 publish(char const *data, std::size_t size) noexcept {
378 return static_cast<char *>(
379 std::memcpy(_out_buffer.allocate_back(size), data, size));
380 }
381
387 void
388 close() noexcept {
389 _out_buffer.reset();
390 static_cast<istream<_IO_> &>(*this).close();
391 }
392};
393
394} // namespace qb::io
395
396#endif // QB_IO_STREAM_H_
Extensible buffer optimized for performance.
Definition pipe.h:552
Input stream template class.
Definition stream.h:49
input_buffer_type _in_buffer
Buffer for incoming data.
Definition stream.h:59
void flush(std::size_t size) noexcept
Remove data from the front of the input buffer.
Definition stream.h:134
std::size_t pendingRead() const noexcept
Get the number of bytes available for reading.
Definition stream.h:103
const _IO_ & transport() const noexcept
Get the underlying transport object (const version)
Definition stream.h:85
int read(std::enable_if_t< has_method_read< _IO_, int, char *, std::size_t >::value, Available > *=nullptr) noexcept
Read data from the transport into the input buffer.
Definition stream.h:117
input_buffer_type & in() noexcept
Get the input buffer.
Definition stream.h:94
void close() noexcept
Close the stream.
Definition stream.h:158
_IO_ transport_io_type
Type of the underlying transport IO.
Definition stream.h:52
_IO_ _in
The underlying IO object.
Definition stream.h:58
qb::allocator::pipe< char > input_buffer_type
Type of the input buffer.
Definition stream.h:55
void eof() noexcept
Handle end-of-file condition.
Definition stream.h:144
_IO_ & transport() noexcept
Get the underlying transport object.
Definition stream.h:76
~istream() noexcept
Destructor.
Definition stream.h:67
Output stream template class.
Definition stream.h:177
const _IO_ & transport() const noexcept
Get the underlying transport object (const version)
Definition stream.h:213
_IO_ _out
The underlying IO object.
Definition stream.h:186
std::size_t pendingWrite() const noexcept
Get the number of bytes pending for writing.
Definition stream.h:231
void close() noexcept
Close the stream.
Definition stream.h:282
_IO_ & transport() noexcept
Get the underlying transport object.
Definition stream.h:204
char * publish(char const *data, std::size_t size) noexcept
Add data to the output buffer for later writing.
Definition stream.h:270
int write(std::enable_if_t< has_method_write< _IO_, int, const char *, std::size_t >::value, Available > *=nullptr) noexcept
Write data from the output buffer to the transport.
Definition stream.h:245
output_buffer_type _out_buffer
Buffer for outgoing data.
Definition stream.h:187
qb::allocator::pipe< char > output_buffer_type
Type of the output buffer.
Definition stream.h:183
output_buffer_type & out() noexcept
Get the output buffer.
Definition stream.h:222
_IO_ transport_io_type
Type of the underlying transport IO.
Definition stream.h:180
~ostream() noexcept
Destructor.
Definition stream.h:195
Combined input/output stream template class.
Definition stream.h:303
void close() noexcept
Close the stream.
Definition stream.h:388
std::size_t pendingWrite() const noexcept
Get the number of bytes pending for writing.
Definition stream.h:337
qb::allocator::pipe< char > output_buffer_type
Type of the output buffer.
Definition stream.h:309
output_buffer_type & out() noexcept
Get the output buffer.
Definition stream.h:328
_IO_ transport_io_type
Type of the underlying transport IO.
Definition stream.h:306
int write(std::enable_if_t< has_method_write< _IO_, int, const char *, std::size_t >::value, Available > *=nullptr) noexcept
Write data from the output buffer to the transport.
Definition stream.h:354
char * publish(char const *data, std::size_t size) noexcept
Add data to the output buffer for later writing.
Definition stream.h:377
output_buffer_type _out_buffer
Definition stream.h:320
static constexpr const bool has_reset_on_pending_read
Definition stream.h:317
Implementation of a dynamic extensible buffer.
Advanced type traits and metaprogramming utilities for the QB Framework.