TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 : #define BOOST_CAPY_READ_UNTIL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/cond.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/io_task.hpp>
20 : #include <boost/capy/concept/dynamic_buffer.hpp>
21 : #include <boost/capy/concept/match_condition.hpp>
22 : #include <boost/capy/concept/read_stream.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 :
25 : #include <algorithm>
26 : #include <cstddef>
27 : #include <optional>
28 : #include <stop_token>
29 : #include <string_view>
30 : #include <type_traits>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : namespace detail {
36 :
37 : // Linearize a buffer sequence into a string
38 : inline
39 : std::string
40 HIT 2 : linearize_buffers(ConstBufferSequence auto const& data)
41 : {
42 2 : std::string linear;
43 2 : linear.reserve(buffer_size(data));
44 2 : auto const end_ = end(data);
45 6 : for(auto it = begin(data); it != end_; ++it)
46 8 : linear.append(
47 4 : static_cast<char const*>(it->data()),
48 : it->size());
49 4 : return linear;
50 : } // LCOV_EXCL_LINE gcov brace artifact (linearize_buffers is exercised)
51 :
52 : // Search buffer using a MatchCondition, with single-buffer optimization
53 : template<MatchCondition M>
54 : std::size_t
55 263 : search_buffer_for_match(
56 : ConstBufferSequence auto const& data,
57 : M const& match,
58 : std::size_t* hint = nullptr)
59 : {
60 : // Fast path: single buffer - no linearization needed
61 263 : if(buffer_length(data) == 1)
62 : {
63 262 : auto const& buf = *begin(data);
64 786 : return match(std::string_view(
65 262 : static_cast<char const*>(buf.data()),
66 262 : buf.size()), hint);
67 : }
68 : // Multiple buffers - linearize
69 1 : return match(linearize_buffers(data), hint);
70 : }
71 :
72 : // Implementation coroutine for read_until with MatchCondition
73 : template<class Stream, class B, MatchCondition M>
74 : io_task<std::size_t>
75 136 : read_until_match_impl(
76 : Stream& stream,
77 : B& buffers,
78 : M match,
79 : std::size_t initial_amount)
80 : {
81 : std::size_t amount = initial_amount;
82 :
83 : for(;;)
84 : {
85 : // Check max_size before preparing
86 : if(buffers.size() >= buffers.max_size())
87 : co_return {error::not_found, 0};
88 :
89 : // Prepare space, respecting max_size
90 : std::size_t const available = buffers.max_size() - buffers.size();
91 : std::size_t const to_prepare = (std::min)(amount, available);
92 : if(to_prepare == 0)
93 : co_return {error::not_found, 0};
94 :
95 : auto mb = buffers.prepare(to_prepare);
96 : auto [ec, n] = co_await stream.read_some(mb);
97 : buffers.commit(n);
98 :
99 : if(!ec)
100 : {
101 : auto pos = search_buffer_for_match(buffers.data(), match);
102 : if(pos != std::string_view::npos)
103 : co_return {{}, pos};
104 : }
105 :
106 : if(ec == cond::eof)
107 : co_return {error::eof, buffers.size()};
108 : if(ec)
109 : co_return {ec, buffers.size()};
110 :
111 : // Grow buffer size for next iteration
112 : if(n == buffer_size(mb))
113 : amount = amount / 2 + amount;
114 : }
115 272 : }
116 :
117 : template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
118 : struct read_until_awaitable
119 : {
120 : Stream* stream_;
121 : M match_;
122 : std::size_t initial_amount_;
123 : std::optional<io_result<std::size_t>> immediate_;
124 : std::optional<io_task<std::size_t>> inner_;
125 :
126 : using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
127 : storage_type buffers_storage_;
128 :
129 136 : B& buffers() noexcept
130 : {
131 : if constexpr(OwnsBuffer)
132 126 : return buffers_storage_;
133 : else
134 10 : return *buffers_storage_;
135 : }
136 :
137 : // Constructor for lvalue (pointer storage)
138 14 : read_until_awaitable(
139 : Stream& stream,
140 : B* buffers,
141 : M match,
142 : std::size_t initial_amount)
143 : requires (!OwnsBuffer)
144 14 : : stream_(std::addressof(stream))
145 14 : , match_(std::move(match))
146 14 : , initial_amount_(initial_amount)
147 14 : , buffers_storage_(buffers)
148 : {
149 14 : auto pos = search_buffer_for_match(
150 14 : buffers_storage_->data(), match_);
151 14 : if(pos != std::string_view::npos)
152 4 : immediate_.emplace(io_result<std::size_t>{{}, pos});
153 14 : }
154 :
155 : // Constructor for rvalue adapter (owned storage)
156 132 : read_until_awaitable(
157 : Stream& stream,
158 : B&& buffers,
159 : M match,
160 : std::size_t initial_amount)
161 : requires OwnsBuffer
162 132 : : stream_(std::addressof(stream))
163 132 : , match_(std::move(match))
164 132 : , initial_amount_(initial_amount)
165 132 : , buffers_storage_(std::move(buffers))
166 : {
167 132 : auto pos = search_buffer_for_match(
168 132 : buffers_storage_.data(), match_);
169 132 : if(pos != std::string_view::npos)
170 6 : immediate_.emplace(io_result<std::size_t>{{}, pos});
171 132 : }
172 :
173 : bool
174 146 : await_ready() const noexcept
175 : {
176 146 : return immediate_.has_value();
177 : }
178 :
179 : std::coroutine_handle<>
180 136 : await_suspend(std::coroutine_handle<> h, io_env const* env)
181 : {
182 272 : inner_.emplace(read_until_match_impl(
183 136 : *stream_, buffers(), match_, initial_amount_));
184 136 : return inner_->await_suspend(h, env);
185 : }
186 :
187 : io_result<std::size_t>
188 146 : await_resume()
189 : {
190 146 : if(immediate_)
191 10 : return *immediate_;
192 136 : return inner_->await_resume();
193 : }
194 : };
195 :
196 : template<ReadStream Stream, class B, MatchCondition M>
197 : using read_until_return_t = read_until_awaitable<
198 : Stream,
199 : std::remove_reference_t<B>,
200 : M,
201 : !std::is_lvalue_reference_v<B&&>>;
202 :
203 : } // namespace detail
204 :
205 : /** Match condition that searches for a delimiter string.
206 :
207 : Satisfies @ref MatchCondition. Returns the position after the
208 : delimiter when found, or `npos` otherwise. Provides an overlap
209 : hint of `delim.size() - 1` to handle delimiters spanning reads.
210 :
211 : @see MatchCondition, read_until
212 : */
213 : struct match_delim
214 : {
215 : /** The delimiter string to search for.
216 :
217 : @note The referenced characters must remain valid
218 : for the lifetime of this object and any pending
219 : read operation.
220 : */
221 : std::string_view delim;
222 :
223 : /** Search for the delimiter in `data`.
224 :
225 : @param data The data to search.
226 : @param hint If non-null, receives the overlap hint
227 : on miss.
228 : @return `0` if `delim` is empty; otherwise the position
229 : just past the delimiter, or `npos` if not found.
230 : */
231 : std::size_t
232 226 : operator()(
233 : std::string_view data,
234 : std::size_t* hint) const noexcept
235 : {
236 226 : if(delim.empty())
237 2 : return 0;
238 224 : auto pos = data.find(delim);
239 224 : if(pos != std::string_view::npos)
240 27 : return pos + delim.size();
241 197 : if(hint)
242 1 : *hint = delim.size() > 1 ? delim.size() - 1 : 0;
243 197 : return std::string_view::npos;
244 : }
245 : };
246 :
247 : /** Asynchronously read until a match condition is satisfied.
248 :
249 : Reads data from `stream` and appends it to `dynbuf` via calling
250 : `stream.read_some` zero or more times and using the prepare/commit
251 : interface until:
252 :
253 : @li either @c match returns a valid position,
254 : @li or @c dynbuf.size() == @c dynbuf.max_size() ,
255 : @li or a contingency on @c stream.read_some occurs.
256 :
257 : If the match condition is satisfied by data in `dynbuf.data()` upon entry,
258 : no call to `stream.read_some` is performed.
259 :
260 :
261 : @par Await-returns
262 :
263 : An object of type `io_result<std::size_t>` destructuring as `[ec, n]`.
264 :
265 : If `bool(ec)`, `n` is the position returned by the match condition
266 : (bytes up to and including the matched delimiter).
267 :
268 :
269 : Contingencies:
270 :
271 : @li The first contingency, reported from awaiting @c stream.read_some .
272 : @li @c cond::not_found -- when @c dynbuf.size() == @c dynbuf.max_size()
273 : and the match condition is not satisfied by data in @c dynbuf.data() .
274 :
275 : @param stream The stream to read from. The caller retains ownership.
276 : @param dynbuf The dynamic buffer to append data to. Must remain
277 : valid until the operation completes.
278 : @param match The match condition callable. Copied into the awaitable.
279 : @param initial_amount Initial bytes to read per iteration (default
280 : 2048). Grows by 1.5x when filled.
281 :
282 :
283 :
284 :
285 : @par Await-throws
286 :
287 : Whatever operations on @c dunbuf throw.
288 :
289 : (Note: types modeling @c DynamicBufferParam provided by Capy throw
290 : @c std::bad_alloc from member function
291 : @c prepare .)
292 :
293 : @par Remarks
294 : Supports _IoAwaitable cancellation_.
295 :
296 : @par Example
297 :
298 : @code
299 : task<> read_http_header( ReadStream auto& stream )
300 : {
301 : std::string header;
302 : auto [ec, n] = co_await read_until(
303 : stream,
304 : string_dynamic_buffer( &header ),
305 : []( std::string_view data, std::size_t* hint ) {
306 : auto pos = data.find( "\r\n\r\n" );
307 : if( pos != std::string_view::npos )
308 : return pos + 4;
309 : if( hint )
310 : (*hint) = 3; // partial "\r\n\r" possible
311 : return std::string_view::npos;
312 : } );
313 : if( ec )
314 : detail::throw_system_error( ec );
315 : // header contains data through "\r\n\r\n"
316 : }
317 : @endcode
318 :
319 : @see read_some, MatchCondition, DynamicBufferParam
320 : */
321 : template<ReadStream Stream, class B, MatchCondition M>
322 : requires DynamicBufferParam<B&&>
323 : detail::read_until_return_t<Stream, B, M>
324 146 : read_until(
325 : Stream& stream,
326 : B&& dynbuf,
327 : M match,
328 : std::size_t initial_amount = 2048)
329 : {
330 146 : constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
331 : using BareB = std::remove_reference_t<B>;
332 :
333 : if constexpr(is_lvalue)
334 : return detail::read_until_awaitable<Stream, BareB, M, false>(
335 14 : stream, std::addressof(dynbuf), std::move(match), initial_amount);
336 : else
337 : return detail::read_until_awaitable<Stream, BareB, M, true>(
338 132 : stream, std::move(dynbuf), std::move(match), initial_amount);
339 : }
340 :
341 : /** Asynchronously read until a delimiter string is found.
342 :
343 : Reads data from the stream until the delimiter is found. This is
344 : a convenience overload equivalent to calling `read_until` with
345 : `match_delim{delim}`. If the delimiter already exists in the
346 : buffer, returns immediately without I/O.
347 :
348 : @li The operation completes when:
349 : @li The delimiter string is found
350 : @li End-of-stream is reached (`cond::eof`)
351 : @li The buffer's `max_size()` is reached (`cond::not_found`)
352 : @li An error occurs
353 : @li The operation is cancelled
354 :
355 : @par Cancellation
356 : Supports cancellation via `stop_token` propagated through the
357 : IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
358 :
359 : @param stream The stream to read from. The caller retains ownership.
360 : @param buffers The dynamic buffer to append data to. Must remain
361 : valid until the operation completes.
362 : @param delim The delimiter string to search for.
363 : @param initial_amount Initial bytes to read per iteration (default
364 : 2048). Grows by 1.5x when filled.
365 :
366 : @return An awaitable that await-returns `(error_code, std::size_t)`.
367 : On success, `n` is bytes up to and including the delimiter.
368 : Compare error codes to conditions:
369 : @li `cond::eof` - EOF before delimiter; `n` is buffer size
370 : @li `cond::not_found` - `max_size()` reached before delimiter
371 : @li `cond::canceled` - Operation was cancelled
372 :
373 : @par Example
374 :
375 : @code
376 : task<std::string> read_line( ReadStream auto& stream )
377 : {
378 : std::string line;
379 : auto [ec, n] = co_await read_until(
380 : stream, string_dynamic_buffer( &line ), "\r\n" );
381 : if( ec == cond::eof )
382 : co_return line; // partial line at EOF
383 : if( ec )
384 : detail::throw_system_error( ec );
385 : line.resize( n - 2 ); // remove "\r\n"
386 : co_return line;
387 : }
388 : @endcode
389 :
390 : @see read_until, match_delim, DynamicBufferParam
391 : */
392 : template<ReadStream Stream, class B>
393 : requires DynamicBufferParam<B&&>
394 : detail::read_until_return_t<Stream, B, match_delim>
395 118 : read_until(
396 : Stream& stream,
397 : B&& buffers,
398 : std::string_view delim,
399 : std::size_t initial_amount = 2048)
400 : {
401 : return read_until(
402 : stream,
403 : std::forward<B>(buffers),
404 : match_delim{delim},
405 118 : initial_amount);
406 : }
407 :
408 : } // namespace capy
409 : } // namespace boost
410 :
411 : #endif
|