From cbd2cbbf73a10e21fa391750f1387a754fbc6c0b Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 09:58:56 +0500 Subject: [PATCH 1/3] update safe stream config for send messsage delays --- examples/benches/send_message_delays/main.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/benches/send_message_delays/main.cpp b/examples/benches/send_message_delays/main.cpp index 273fda51..ab73739f 100644 --- a/examples/benches/send_message_delays/main.cpp +++ b/examples/benches/send_message_delays/main.cpp @@ -120,8 +120,8 @@ class TestSendMessageDelaysAction : public Action { std::numeric_limits::max(), (std::numeric_limits::max() / 2) - 1, (std::numeric_limits::max() / 2) - 1, - 5, - std::chrono::milliseconds{300}, + 10, + std::chrono::milliseconds{400}, std::chrono::milliseconds{0}, std::chrono::milliseconds{200}, }; From 71d3e5bbb1d2b48650ce568cf88dce64a9c0497d Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 09:57:57 +0500 Subject: [PATCH 2/3] fix safe stream actions to receive overlapped chunks and propper wait for confirmation --- aether/stream_api/safe_stream.cpp | 85 ++++++- aether/stream_api/safe_stream.h | 9 +- .../safe_stream/safe_stream_receiving.cpp | 233 +++++++++--------- .../safe_stream/safe_stream_receiving.h | 47 ++-- .../safe_stream/safe_stream_sending.cpp | 64 ++--- .../safe_stream/safe_stream_sending.h | 18 +- 6 files changed, 239 insertions(+), 217 deletions(-) diff --git a/aether/stream_api/safe_stream.cpp b/aether/stream_api/safe_stream.cpp index d323f215..00885015 100644 --- a/aether/stream_api/safe_stream.cpp +++ b/aether/stream_api/safe_stream.cpp @@ -19,6 +19,8 @@ #include #include +#include "aether/api_protocol/packet_builder.h" +#include "aether/stream_api/safe_stream/safe_stream_api.h" #include "aether/stream_api/safe_stream/sending_data_action.h" namespace ae { @@ -70,7 +72,8 @@ ActionView SafeStream::SafeStreamInGate::Write( return action; } -void SafeStream::SafeStreamInGate::WriteOut(DataBuffer&& buffer) { +void SafeStream::SafeStreamInGate::WriteOut(DataBuffer&& buffer, + TimePoint /* current_time */) { out_data_event_.Emit(buffer); } @@ -122,18 +125,24 @@ void SafeStream::SafeStreamOutGate::LinkOut(OutGate& gate) { SafeStream::SafeStream(ActionContext action_context, SafeStreamConfig config) : action_context_{action_context}, - safe_stream_sending_{action_context_, protocol_context_, config}, - safe_stream_receiving_{action_context_, protocol_context_, config}, + safe_stream_sending_{action_context_, config}, + safe_stream_receiving_{action_context_, config}, in_{action_context_, safe_stream_sending_, config.max_data_size}, out_{protocol_context_} { - subscriptions_.Push(safe_stream_sending_.write_data_event().Subscribe( - *this, MethodPtr<&SafeStream::OnDataWrite>{}), + subscriptions_.Push(safe_stream_sending_.send_event().Subscribe( + *this, MethodPtr<&SafeStream::OnSendEvent>{}), + safe_stream_sending_.repeat_event().Subscribe( + *this, MethodPtr<&SafeStream::OnRepeatEvent>{}), safe_stream_receiving_.receive_event().Subscribe( in_, MethodPtr<&SafeStreamInGate::WriteOut>{})); subscriptions_.Push( - safe_stream_receiving_.send_data_event().Subscribe( - *this, MethodPtr<&SafeStream::OnDataReaderSend>{}), + safe_stream_receiving_.confirm_event().Subscribe( + *this, MethodPtr<&SafeStream::OnConfirmEvent>{}), + safe_stream_receiving_.request_repeat_event().Subscribe( + *this, MethodPtr<&SafeStream::OnRequestRepeatEvent>{})); + + subscriptions_.Push( protocol_context_.MessageEvent().Subscribe( *this, MethodPtr<&SafeStream::Confirm>{}), protocol_context_.MessageEvent().Subscribe( @@ -150,9 +159,16 @@ ByteGate::Base& SafeStream::in() { return in_; } void SafeStream::LinkOut(OutGate& gate) { out_.LinkOut(gate); } -void SafeStream::OnDataWrite(SafeStreamRingIndex offset, DataBuffer&& data, +void SafeStream::OnSendEvent(SafeStreamRingIndex offset, DataBuffer&& data, TimePoint current_time) { - auto write_action = out_.Write(std::move(data), current_time); + auto packet = PacketBuilder{ + protocol_context_, + PackMessage{ + SafeStreamApi{}, + SafeStreamApi::Send{ + {}, static_cast(offset), std::move(data)}}}; + + auto write_action = out_.Write(std::move(packet), current_time); subscriptions_.Push( write_action->SubscribeOnResult([this, offset](auto const& /* action */) { @@ -168,8 +184,51 @@ void SafeStream::OnDataWrite(SafeStreamRingIndex offset, DataBuffer&& data, })); } -void SafeStream::OnDataReaderSend(DataBuffer&& data, TimePoint current_time) { - out_.Write(std::move(data), current_time); +void SafeStream::OnRepeatEvent(SafeStreamRingIndex offset, + std::uint16_t repeat_count, DataBuffer&& data, + TimePoint current_time) { + auto packet = PacketBuilder{ + protocol_context_, + PackMessage{SafeStreamApi{}, + SafeStreamApi::Repeat{{}, + repeat_count, + static_cast(offset), + std::move(data)}}}; + + auto write_action = out_.Write(std::move(packet), current_time); + + subscriptions_.Push( + write_action->SubscribeOnResult([this, offset](auto const& /* action */) { + safe_stream_sending_.ReportWriteSuccess(offset); + })); + subscriptions_.Push( + write_action->SubscribeOnError([this, offset](auto const& /* action */) { + safe_stream_sending_.ReportWriteError(offset); + })); + subscriptions_.Push( + write_action->SubscribeOnStop([this, offset](auto const& /* action */) { + safe_stream_sending_.ReportWriteStopped(offset); + })); +} + +void SafeStream::OnConfirmEvent(SafeStreamRingIndex offset, + TimePoint current_time) { + auto packet = PacketBuilder{ + protocol_context_, + PackMessage{ + SafeStreamApi{}, + SafeStreamApi::Confirm{{}, static_cast(offset)}}}; + out_.Write(std::move(packet), current_time); +} + +void SafeStream::OnRequestRepeatEvent(SafeStreamRingIndex offset, + TimePoint current_time) { + auto packet = + PacketBuilder{protocol_context_, + PackMessage{SafeStreamApi{}, + SafeStreamApi::RequestRepeat{ + {}, static_cast(offset)}}}; + out_.Write(std::move(packet), current_time); } void SafeStream::Confirm(MessageEventData const& msg) { @@ -185,14 +244,14 @@ void SafeStream::RequestRepeatSend( void SafeStream::ReceiveSend(MessageEventData const& msg) { safe_stream_receiving_.ReceiveSend( SafeStreamRingIndex{msg.message().offset}, - std::move(const_cast(msg.message()).data)); + std::move(const_cast(msg.message()).data), Now()); } void SafeStream::ReceiveRepeat( MessageEventData const& msg) { safe_stream_receiving_.ReceiveRepeat( SafeStreamRingIndex{msg.message().offset}, msg.message().repeat_count, - std::move(const_cast(msg.message()).data)); + std::move(const_cast(msg.message()).data), Now()); } } // namespace ae diff --git a/aether/stream_api/safe_stream.h b/aether/stream_api/safe_stream.h index af49da23..22b7712e 100644 --- a/aether/stream_api/safe_stream.h +++ b/aether/stream_api/safe_stream.h @@ -58,7 +58,7 @@ class SafeStream final : public ByteStream { ActionView Write(DataBuffer &&buffer, TimePoint current_time) override; - void WriteOut(DataBuffer &&buffer); + void WriteOut(DataBuffer &&buffer, TimePoint current_time); void LinkOut(OutGate &gate) override; @@ -92,10 +92,13 @@ class SafeStream final : public ByteStream { void LinkOut(OutGate &gate) override; private: - void OnDataWrite(SafeStreamRingIndex offset, DataBuffer &&data, + void OnSendEvent(SafeStreamRingIndex offset, DataBuffer &&data, TimePoint current_time); + void OnRepeatEvent(SafeStreamRingIndex offset, std::uint16_t repeat_count, + DataBuffer &&data, TimePoint current_time); - void OnDataReaderSend(DataBuffer &&data, TimePoint current_time); + void OnConfirmEvent(SafeStreamRingIndex offset, TimePoint current_time); + void OnRequestRepeatEvent(SafeStreamRingIndex offset, TimePoint current_time); void Confirm(MessageEventData const &msg); void RequestRepeatSend( diff --git a/aether/stream_api/safe_stream/safe_stream_receiving.cpp b/aether/stream_api/safe_stream/safe_stream_receiving.cpp index d247a595..e054f747 100644 --- a/aether/stream_api/safe_stream/safe_stream_receiving.cpp +++ b/aether/stream_api/safe_stream/safe_stream_receiving.cpp @@ -16,19 +16,17 @@ #include "aether/stream_api/safe_stream/safe_stream_receiving.h" -#include #include +#include +#include -#include "aether/api_protocol/packet_builder.h" #include "aether/tele/tele.h" namespace ae { SafeStreamReceivingAction ::SafeStreamReceivingAction( - ActionContext action_context, ProtocolContext& protocol_context, - SafeStreamConfig const& config) + ActionContext action_context, SafeStreamConfig const& config) : Action(action_context), - protocol_context_{protocol_context}, max_window_size_{config.window_size}, max_repeat_count_{config.max_repeat_count}, send_confirm_timeout_{config.send_confirm_timeout}, @@ -39,13 +37,10 @@ TimePoint SafeStreamReceivingAction::Update(TimePoint current_time) { if (repeat_count_exceeded_) { repeat_count_exceeded_ = false; - repeat_queue_.clear(); Action::Error(*this); - return new_time; + return current_time; } - MakeResponse(current_time); - return new_time; } @@ -54,95 +49,115 @@ SafeStreamReceivingAction::receive_event() { return receive_event_; } -SafeStreamReceivingAction::SenDataEvent::Subscriber -SafeStreamReceivingAction::send_data_event() { - return send_data_event_; +SafeStreamReceivingAction::ConfirmEvent::Subscriber +SafeStreamReceivingAction::confirm_event() { + return send_confirm_event_; +} + +SafeStreamReceivingAction::RequestRepeatEvent::Subscriber +SafeStreamReceivingAction::request_repeat_event() { + return send_request_repeat_event_; } void SafeStreamReceivingAction::ReceiveSend(SafeStreamRingIndex offset, - DataBuffer data) { - AE_TELED_DEBUG("Data received offset {}", offset); - if (last_confirmed_offset_.Distance(offset) >= max_window_size_) { + DataBuffer data, + TimePoint /* current_time */) { + AE_TELED_DEBUG("Data received offset {}, size {}", offset, data.size()); + if (!AddDataChunk(ReceivingChunk{offset, std::move(data), 0})) { // confirmed offset - AE_TELED_WARNING("Confirmed offset is duplicated"); return; } - AddDataChunk(ReceivingChunk{offset, std::move(data)}); - - this->Trigger(); + Action::Trigger(); } void SafeStreamReceivingAction::ReceiveRepeat(SafeStreamRingIndex offset, std::uint16_t repeat, - DataBuffer data) { - AE_TELED_DEBUG("Repeat data received offset: {}, repeat {}", offset, repeat); - if (last_confirmed_offset_.Distance(offset) >= max_window_size_) { + DataBuffer data, + TimePoint current_time) { + auto data_size = data.size(); + AE_TELED_DEBUG("Repeat data received offset: {}, repeat {}, size {}", offset, + repeat, data.size()); + if (!AddDataChunk(ReceivingChunk{offset, std::move(data), repeat})) { // confirmed offset - AE_TELED_WARNING("Confirmed offset is duplicated"); - AddToConfirmationQueue( - offset + static_cast(data.size() - 1)); + MakeConfirm(offset + static_cast(data_size - 1), + current_time); return; } - AddDataChunk(ReceivingChunk{offset, std::move(data)}); - - this->Trigger(); + Action::Trigger(); } -void SafeStreamReceivingAction::AddDataChunk(ReceivingChunk chunk) { +bool SafeStreamReceivingAction::AddDataChunk(ReceivingChunk&& chunk) { + if (last_emitted_offset_.Distance( + chunk.offset + static_cast( + chunk.data.size() - 1)) > max_window_size_) { + AE_TELED_WARNING("Chunk is duplicated with already emitted"); + // the chunk is emitted, do not add + return false; + } + auto it = std::find_if(std::begin(received_data_chunks_), std::end(received_data_chunks_), [&](auto const& ch) { return chunk.offset.Distance(ch.offset) <= max_window_size_; }); - if (it == std::end(received_data_chunks_)) { - // new chunk has the biggest offset - received_data_chunks_.emplace_back(std::move(chunk)); - return; - } - // duplication found - if (chunk.offset == it->offset) { - AE_TELED_WARNING("Chunk duplication found"); - return; - } + if (it != std::end(received_data_chunks_)) { + // duplication found + if ((chunk.offset == it->offset) && + (chunk.data.size() == it->data.size())) { + AE_TELED_WARNING("Chunk is duplicated with received one"); + return false; + } - auto overlap_size = it->offset.Distance( - chunk.offset + static_cast(chunk.data.size())); - if ((overlap_size > 0) && (overlap_size <= max_window_size_)) { - // chunks overlap - AE_TELED_WARNING("Chunks overlap"); - received_data_chunks_.insert( - it, ReceivingChunk{ - chunk.offset, - {std::begin(chunk.data), - std::begin(chunk.data) + chunk.offset.Distance(it->offset)}}); + // overlap size with the next chunk + auto overlap_size = it->offset.Distance( + chunk.offset + + static_cast(chunk.data.size())); + if ((overlap_size > 0) && (overlap_size <= max_window_size_)) { + AE_TELED_DEBUG("Chunks overlap"); + // chunks overlap + received_data_chunks_.emplace( + it, ReceivingChunk{ + chunk.offset, + {std::begin(chunk.data), + std::begin(chunk.data) + + static_cast( + chunk.data.size() - overlap_size)}, + std::max(it->repeat_count, chunk.repeat_count), + }); + } else { + AE_TELED_DEBUG("Insert chunk"); + received_data_chunks_.emplace(it, std::move(chunk)); + } } else { - received_data_chunks_.insert(it, std::move(chunk)); - } - - // update expected chunks - auto ex_it = std::find_if( - std::begin(expected_chunks_), std::end(expected_chunks_), - [&](auto const& ex_ch) { return ex_ch.offset == chunk.offset; }); - - if (ex_it != std::end(expected_chunks_)) { - expected_chunks_.erase(ex_it); + auto overlap_size = chunk.offset.Distance(last_emitted_offset_); + if ((overlap_size > 0) && (overlap_size <= max_window_size_)) { + AE_TELED_DEBUG("Chunk overlapped with emitted"); + // new chunk is overlapped with last emitted + received_data_chunks_.emplace_back(ReceivingChunk{ + last_emitted_offset_, + {std::begin(chunk.data) + overlap_size, std::end(chunk.data)}, + chunk.repeat_count, + }); + } else { + AE_TELED_DEBUG("Emplace back chunk"); + // new chunk has the biggest offset + received_data_chunks_.emplace_back(std::move(chunk)); + } } + return true; } TimePoint SafeStreamReceivingAction::CheckChunkChains(TimePoint current_time) { - auto conf_time = CheckCompletedChains(current_time); + CheckCompletedChains(current_time); + + auto conf_time = CheckChunkConfirmation(current_time); auto rep_time = CheckMissedOffset(current_time); return std::min(conf_time, rep_time); } -TimePoint SafeStreamReceivingAction::CheckCompletedChains( - TimePoint current_time) { - if ((last_send_confirm_time_ + send_confirm_timeout_) > current_time) { - return (last_send_confirm_time_ + send_confirm_timeout_); - } - +void SafeStreamReceivingAction::CheckCompletedChains(TimePoint current_time) { auto next_chunk_offset = last_confirmed_offset_; auto it = std::begin(received_data_chunks_); for (; it != std::end(received_data_chunks_); it++) { @@ -154,20 +169,30 @@ TimePoint SafeStreamReceivingAction::CheckCompletedChains( break; } } + if (std::distance(std::begin(received_data_chunks_), it) == 0) { + return; + } auto data = JoinChunks(std::begin(received_data_chunks_), it); if (!data.empty()) { AE_TELED_DEBUG("Data chunk chain received length: {} to offset: {}", data.size(), next_chunk_offset); - receive_event_.Emit(std::move(data)); + receive_event_.Emit(std::move(data), current_time); + received_data_chunks_.erase(std::begin(received_data_chunks_), it); + last_emitted_offset_ = next_chunk_offset; } +} - received_data_chunks_.erase(std::begin(received_data_chunks_), it); +TimePoint SafeStreamReceivingAction::CheckChunkConfirmation( + TimePoint current_time) { + if ((last_send_confirm_time_ + send_confirm_timeout_) > current_time) { + return (last_send_confirm_time_ + send_confirm_timeout_); + } - if (next_chunk_offset != last_confirmed_offset_) { - // confirm range [last_confirmed_offset_, next_chunk_offset) - AddToConfirmationQueue(next_chunk_offset - 1); - last_confirmed_offset_ = next_chunk_offset; + if (last_emitted_offset_ != last_confirmed_offset_) { + // confirm range [last_confirmed_offset_, last_emitted_offset_) + MakeConfirm(last_emitted_offset_ - 1, current_time); + last_confirmed_offset_ = last_emitted_offset_; last_send_confirm_time_ = current_time; } return current_time; @@ -183,7 +208,13 @@ TimePoint SafeStreamReceivingAction::CheckMissedOffset(TimePoint current_time) { // if got not expected chunk if (chunk.offset != next_chunk_offset) { AE_TELED_DEBUG("Request to repeat offset: {}", next_chunk_offset); - AddExpectedChunk(next_chunk_offset); + ++chunk.repeat_count; + if (chunk.repeat_count > max_repeat_count_) { + // set failed state + repeat_count_exceeded_ = true; + break; + } + MakeRepeat(next_chunk_offset, current_time); } next_chunk_offset = chunk.offset + static_cast( chunk.data.size()); @@ -193,53 +224,16 @@ TimePoint SafeStreamReceivingAction::CheckMissedOffset(TimePoint current_time) { return current_time; } -void SafeStreamReceivingAction::MakeResponse(TimePoint current_time) { - if (confirmation_queue_.empty() && repeat_queue_.empty()) { - return; - } - - auto packet = PacketBuilder{protocol_context_}; - for (auto const& confirm : confirmation_queue_) { - AE_TELED_DEBUG("Send confirm to offset {}", confirm); - packet.Push(safe_stream_api_, SafeStreamApi::Confirm{ - {}, static_cast(confirm)}); - } - confirmation_queue_.clear(); - for (auto const& repeat : repeat_queue_) { - AE_TELED_DEBUG("Send repeat request to offset {}", repeat); - packet.Push(safe_stream_api_, SafeStreamApi::RequestRepeat{ - {}, static_cast(repeat)}); - } - repeat_queue_.clear(); - - send_data_event_.Emit(std::move(packet), current_time); -} - -void SafeStreamReceivingAction::AddExpectedChunk(SafeStreamRingIndex offset) { - auto ex_it = - std::find_if(std::begin(expected_chunks_), std::end(expected_chunks_), - [&](auto const& ex_ch) { return ex_ch.offset == offset; }); - if (ex_it != std::end(expected_chunks_)) { - ex_it->repeat_count++; - if (ex_it->repeat_count > max_repeat_count_) { - // set failed state - repeat_count_exceeded_ = true; - return; - } - } else { - expected_chunks_.emplace_back(ExpectedChunk{offset, 0}); - } - - AddToRepeatQueue(offset); -} - -void SafeStreamReceivingAction::AddToConfirmationQueue( - SafeStreamRingIndex offset) { - confirmation_queue_.push_back(offset); +void SafeStreamReceivingAction::MakeConfirm(SafeStreamRingIndex offset, + TimePoint current_time) { + AE_TELED_DEBUG("Send confirm to offset {}", offset); + send_confirm_event_.Emit(offset, current_time); } -void SafeStreamReceivingAction::AddToRepeatQueue(SafeStreamRingIndex offset) { - repeat_queue_.push_back(offset); +void SafeStreamReceivingAction::MakeRepeat(SafeStreamRingIndex offset, + TimePoint current_time) { + AE_TELED_DEBUG("Send repeat request to offset {}", offset); + send_request_repeat_event_.Emit(offset, current_time); } DataBuffer SafeStreamReceivingAction::JoinChunks( @@ -254,7 +248,8 @@ DataBuffer SafeStreamReceivingAction::JoinChunks( data.reserve(size); // then copy data for (auto it = begin; it != end; it++) { - data.insert(std::end(data), std::begin(it->data), std::end(it->data)); + std::copy(std::begin(it->data), std::end(it->data), + std::back_inserter(data)); } return data; } diff --git a/aether/stream_api/safe_stream/safe_stream_receiving.h b/aether/stream_api/safe_stream/safe_stream_receiving.h index 66065f90..87630f00 100644 --- a/aether/stream_api/safe_stream/safe_stream_receiving.h +++ b/aether/stream_api/safe_stream/safe_stream_receiving.h @@ -17,9 +17,8 @@ #ifndef AETHER_STREAM_API_SAFE_STREAM_SAFE_STREAM_RECEIVING_H_ #define AETHER_STREAM_API_SAFE_STREAM_SAFE_STREAM_RECEIVING_H_ -#include #include -#include +#include #include "aether/events/events.h" #include "aether/actions/action.h" @@ -36,48 +35,45 @@ namespace ae { struct ReceivingChunk { SafeStreamRingIndex offset; DataBuffer data; -}; - -struct ExpectedChunk { - SafeStreamRingIndex offset; std::uint16_t repeat_count; }; class SafeStreamReceivingAction : public Action { public: - using ReceiveEvent = Event; - using SenDataEvent = Event; + using ReceiveEvent = Event; + using RequestRepeatEvent = + Event; + using ConfirmEvent = + Event; SafeStreamReceivingAction(ActionContext action_context, - ProtocolContext& protocol_context, SafeStreamConfig const& config); TimePoint Update(TimePoint current_time) override; ReceiveEvent::Subscriber receive_event(); - SenDataEvent::Subscriber send_data_event(); + ConfirmEvent::Subscriber confirm_event(); + RequestRepeatEvent::Subscriber request_repeat_event(); - void ReceiveSend(SafeStreamRingIndex offset, DataBuffer data); + void ReceiveSend(SafeStreamRingIndex offset, DataBuffer data, + TimePoint current_time); void ReceiveRepeat(SafeStreamRingIndex offset, std::uint16_t repeat, - DataBuffer data); + DataBuffer data, TimePoint current_time); private: - void AddDataChunk(ReceivingChunk chunk); + bool AddDataChunk(ReceivingChunk&& chunk); TimePoint CheckChunkChains(TimePoint current_time); - TimePoint CheckCompletedChains(TimePoint current_time); + void CheckCompletedChains(TimePoint current_time); + TimePoint CheckChunkConfirmation(TimePoint current_time); TimePoint CheckMissedOffset(TimePoint current_time); - void MakeResponse(TimePoint current_time); - - void AddExpectedChunk(SafeStreamRingIndex offset); - void AddToConfirmationQueue(SafeStreamRingIndex offset); - void AddToRepeatQueue(SafeStreamRingIndex offset); + void MakeConfirm(SafeStreamRingIndex offset, TimePoint current_time); + void MakeRepeat(SafeStreamRingIndex offset, TimePoint current_time); - DataBuffer JoinChunks(std::vector::iterator begin, - std::vector::iterator end); + static DataBuffer JoinChunks(std::vector::iterator begin, + std::vector::iterator end); - ProtocolContext& protocol_context_; SafeStreamApi safe_stream_api_; std::uint16_t max_window_size_; @@ -88,14 +84,13 @@ class SafeStreamReceivingAction : public Action { TimePoint last_send_confirm_time_; TimePoint oldest_repeat_time_; SafeStreamRingIndex last_confirmed_offset_; + SafeStreamRingIndex last_emitted_offset_; ReceiveEvent receive_event_; - SenDataEvent send_data_event_; + ConfirmEvent send_confirm_event_; + RequestRepeatEvent send_request_repeat_event_; std::vector received_data_chunks_; - std::vector expected_chunks_; - std::deque repeat_queue_; - std::deque confirmation_queue_; bool repeat_count_exceeded_ = false; }; diff --git a/aether/stream_api/safe_stream/safe_stream_sending.cpp b/aether/stream_api/safe_stream/safe_stream_sending.cpp index 5a12465e..104ad3e6 100644 --- a/aether/stream_api/safe_stream/safe_stream_sending.cpp +++ b/aether/stream_api/safe_stream/safe_stream_sending.cpp @@ -19,16 +19,12 @@ #include #include -#include "aether/api_protocol/packet_builder.h" - #include "aether/tele/tele.h" namespace ae { -SafeStreamSendingAction::SafeStreamSendingAction( - ActionContext action_context, ProtocolContext& protocol_context, - SafeStreamConfig const& config) +SafeStreamSendingAction::SafeStreamSendingAction(ActionContext action_context, + SafeStreamConfig const& config) : Action{action_context}, - protocol_context_{protocol_context}, buffer_capacity_{config.buffer_capacity}, window_size_{config.window_size}, max_repeat_count_{config.max_repeat_count}, @@ -48,12 +44,17 @@ TimePoint SafeStreamSendingAction::Update(TimePoint current_time) { if (max_data_size_ != 0) { SendData(current_time); } - return new_time; + return std::max(new_time, current_time + wait_confirm_timeout_); +} + +SafeStreamSendingAction::SendEvent::Subscriber +SafeStreamSendingAction::send_event() { + return send_event_; } -SafeStreamSendingAction::WriteDataEvent::Subscriber -SafeStreamSendingAction::write_data_event() { - return write_data_event_; +SafeStreamSendingAction::RepeatEvent::Subscriber +SafeStreamSendingAction::repeat_event() { + return repeat_event_; } ActionView SafeStreamSendingAction::SendData( @@ -134,13 +135,14 @@ TimePoint SafeStreamSendingAction::HandleTimeouts(TimePoint current_time) { if ((selected_sch.send_time + wait_timeout) < current_time) { // timeout - AE_TELED_DEBUG("Wait confirm timeout, repeat"); + AE_TELED_DEBUG("Wait confirm timeout {:%S}, repeat offset {}", wait_timeout, + selected_sch.begin_offset); // move offset to repeat send last_sent_offset_ = selected_sch.begin_offset; return current_time; } - return selected_sch.send_time + wait_confirm_timeout_; + return selected_sch.send_time + wait_timeout; } void SafeStreamSendingAction::SendData(TimePoint current_time) { @@ -184,20 +186,7 @@ void SafeStreamSendingAction::SendData(TimePoint current_time) { void SafeStreamSendingAction::SendFirst(DataChunk&& chunk, TimePoint current_time) { AE_TELED_DEBUG("SendFirst chunk offset:{}", chunk.offset); - - auto packet = PacketBuilder{ - protocol_context_, - PackMessage{ - safe_stream_api_, - SafeStreamApi::Send{ - {}, - static_cast(chunk.offset), - std::move(chunk.data), - }, - }, - }; - - WriteDataBuffer(chunk.offset, std::move(packet), current_time); + send_event_.Emit(chunk.offset, std::move(chunk.data), current_time); } void SafeStreamSendingAction::SendRepeat(DataChunk&& chunk, @@ -205,21 +194,8 @@ void SafeStreamSendingAction::SendRepeat(DataChunk&& chunk, TimePoint current_time) { AE_TELED_DEBUG("SendRepeat chunk offset:{} count:{}", chunk.offset, repeat_count); - - auto packet = PacketBuilder{ - protocol_context_, - PackMessage{ - safe_stream_api_, - SafeStreamApi::Repeat{ - {}, - repeat_count, - static_cast(chunk.offset), - std::move(chunk.data), - }, - }, - }; - - WriteDataBuffer(chunk.offset, std::move(packet), current_time); + repeat_event_.Emit(chunk.offset, repeat_count, std::move(chunk.data), + current_time); } void SafeStreamSendingAction::ConfirmDataChunks(SafeStreamRingIndex offset) { @@ -227,12 +203,6 @@ void SafeStreamSendingAction::ConfirmDataChunks(SafeStreamRingIndex offset) { send_data_buffer_.Confirm(offset); } -void SafeStreamSendingAction::WriteDataBuffer(SafeStreamRingIndex offset, - DataBuffer&& packet, - TimePoint current_time) { - write_data_event_.Emit(offset, std::move(packet), current_time); -} - void SafeStreamSendingAction::StopSending(SafeStreamRingIndex offset) { AE_TELED_DEBUG("StopSending offset:{}", offset); sending_chunks_.RemoveUpTo(offset); diff --git a/aether/stream_api/safe_stream/safe_stream_sending.h b/aether/stream_api/safe_stream/safe_stream_sending.h index 47f1198b..af6560e3 100644 --- a/aether/stream_api/safe_stream/safe_stream_sending.h +++ b/aether/stream_api/safe_stream/safe_stream_sending.h @@ -37,18 +37,21 @@ namespace ae { class SafeStreamSendingAction : public Action { public: - using WriteDataEvent = Event; + using SendEvent = Event; + using RepeatEvent = + Event; SafeStreamSendingAction(ActionContext action_context, - ProtocolContext& protocol_context, SafeStreamConfig const& config); ~SafeStreamSendingAction() override; TimePoint Update(TimePoint current_time) override; - WriteDataEvent::Subscriber write_data_event(); + SendEvent::Subscriber send_event(); + RepeatEvent::Subscriber repeat_event(); /** * \brief Put new data to send @@ -73,12 +76,8 @@ class SafeStreamSendingAction : public Action { void ConfirmDataChunks(SafeStreamRingIndex offset); - void WriteDataBuffer(SafeStreamRingIndex offset, DataBuffer&& packet, - TimePoint current_time); - void StopSending(SafeStreamRingIndex offset); - ProtocolContext& protocol_context_; SafeStreamRingIndex::type buffer_capacity_; SafeStreamRingIndex::type window_size_; std::uint16_t max_repeat_count_; @@ -88,7 +87,8 @@ class SafeStreamSendingAction : public Action { SendDataBuffer send_data_buffer_; SendingChunkList sending_chunks_; - WriteDataEvent write_data_event_; + SendEvent send_event_; + RepeatEvent repeat_event_; SafeStreamRingIndex last_confirmed_; SafeStreamRingIndex next_to_add_; From 6458b3efc6b9a2dface20e987c87f1524c1338d7 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 09:58:29 +0500 Subject: [PATCH 3/3] add test with duplicated and overlapped offset --- .../test_safe_stream_receiving.cpp | 152 +++++++++------- .../safe-stream/test_safe_stream_sending.cpp | 163 +++++++----------- .../test-stream/safe-stream/to_data_buffer.h | 39 +++++ 3 files changed, 193 insertions(+), 161 deletions(-) create mode 100644 tests/test-stream/safe-stream/to_data_buffer.h diff --git a/tests/test-stream/safe-stream/test_safe_stream_receiving.cpp b/tests/test-stream/safe-stream/test_safe_stream_receiving.cpp index d0e70848..325fc402 100644 --- a/tests/test-stream/safe-stream/test_safe_stream_receiving.cpp +++ b/tests/test-stream/safe-stream/test_safe_stream_receiving.cpp @@ -18,11 +18,11 @@ #include "aether/port/tele_init.h" #include "aether/actions/action_context.h" -#include "aether/api_protocol/protocol_context.h" -#include "aether/stream_api/safe_stream/safe_stream_api.h" -#include "aether/stream_api/safe_stream/safe_stream_receiving.h" #include "aether/stream_api/safe_stream/safe_stream_types.h" +#include "aether/stream_api/safe_stream/safe_stream_receiving.h" + +#include "tests/test-stream/safe-stream/to_data_buffer.h" namespace ae::test_safe_stream_receiving { @@ -50,36 +50,26 @@ void test_SafeStreamReceiveAFewPackets() { auto ap = ActionProcessor{}; auto ac = ActionContext{ap}; - auto pc = ProtocolContext{}; auto received_packet = DataBuffer{}; auto confirmed_offset = std::uint16_t{}; - auto receiving = SafeStreamReceivingAction{ac, pc, config}; - - auto _0 = receiving.send_data_event().Subscribe([&](auto const& data, auto) { - auto api_parser = ae::ApiParser(pc, data); - auto mid = api_parser.Extract(); - switch (mid) { - case SafeStreamApi::Confirm::kMessageCode: { - auto confirm = api_parser.Extract(); - confirmed_offset = confirm.offset; - break; - } - default: - TEST_FAIL_MESSAGE("Unexpected message"); - break; - } - }); + auto receiving = SafeStreamReceivingAction{ac, config}; + + auto confirm_sub = + receiving.confirm_event().Subscribe([&](auto offset, auto) { + confirmed_offset = static_cast(offset); + }); + auto request_repeat_sub = receiving.request_repeat_event().Subscribe( + [&](auto, auto) { TEST_FAIL_MESSAGE("Unexpected request repeat"); }); auto _1 = receiving.receive_event().Subscribe( - [&](DataBuffer&& data) { received_packet = std::move(data); }); + [&](DataBuffer&& data, auto) { received_packet = std::move(data); }); ap.Update(epoch); - receiving.ReceiveSend( - SafeStreamRingIndex{0}, - {_100_bytes_data, _100_bytes_data + sizeof(_100_bytes_data)}); + receiving.ReceiveSend(SafeStreamRingIndex{0}, ToDataBuffer(_100_bytes_data), + Now()); ap.Update(epoch += std::chrono::milliseconds{1}); @@ -88,9 +78,17 @@ void test_SafeStreamReceiveAFewPackets() { received_packet.clear(); // duplicate send - receiving.ReceiveSend( - SafeStreamRingIndex{0}, - {_100_bytes_data, _100_bytes_data + sizeof(_100_bytes_data)}); + receiving.ReceiveSend(SafeStreamRingIndex{0}, ToDataBuffer(_100_bytes_data), + Now()); + + ap.Update(epoch += std::chrono::milliseconds{1}); + + TEST_ASSERT_EQUAL(0, received_packet.size()); + TEST_ASSERT_EQUAL(99, confirmed_offset); + + // duplicate send repeat + receiving.ReceiveRepeat(SafeStreamRingIndex{0}, 1, + ToDataBuffer(_100_bytes_data), Now()); ap.Update(epoch += std::chrono::milliseconds{1}); @@ -98,9 +96,8 @@ void test_SafeStreamReceiveAFewPackets() { TEST_ASSERT_EQUAL(99, confirmed_offset); // send in ordered - receiving.ReceiveSend( - SafeStreamRingIndex(confirmed_offset) + 1 + 100, - {_200_bytes_data + 100, _200_bytes_data + sizeof(_200_bytes_data)}); + receiving.ReceiveSend(SafeStreamRingIndex(confirmed_offset) + 1 + 100, + ToDataBuffer(_100_bytes_data), Now()); ap.Update(epoch += std::chrono::milliseconds{1}); @@ -111,7 +108,7 @@ void test_SafeStreamReceiveAFewPackets() { // add missed part receiving.ReceiveSend(SafeStreamRingIndex(confirmed_offset) + 1, - {_200_bytes_data, _200_bytes_data + 100}); + ToDataBuffer(_100_bytes_data), Now()); ap.Update(epoch += std::chrono::milliseconds{1}); @@ -121,9 +118,8 @@ void test_SafeStreamReceiveAFewPackets() { received_packet.clear(); // add late packet - receiving.ReceiveSend( - SafeStreamRingIndex(confirmed_offset) + 100, - {_200_bytes_data, _200_bytes_data + sizeof(_200_bytes_data)}); + receiving.ReceiveSend(SafeStreamRingIndex(confirmed_offset) + 100, + ToDataBuffer(_200_bytes_data), Now()); ap.Update(epoch += std::chrono::milliseconds{1}); @@ -136,49 +132,37 @@ void test_SafeStreamReceiveRequestRepeat() { auto ap = ActionProcessor{}; auto ac = ActionContext{ap}; - auto pc = ProtocolContext{}; auto received_packet = DataBuffer{}; auto confirmed_offset = std::uint16_t{12}; auto repeat_requested = std::vector{}; - auto receiving = SafeStreamReceivingAction{ac, pc, config}; + auto receiving = SafeStreamReceivingAction{ac, config}; - auto _0 = receiving.send_data_event().Subscribe([&](auto const& data, auto) { - auto api_parser = ae::ApiParser(pc, data); - auto api = SafeStreamApi{}; - api_parser.Parse(api); - }); - - auto _1 = - pc.MessageEvent().Subscribe([&](auto const& msg) { - auto& confirm = msg.message(); - confirmed_offset = confirm.offset; + auto confirm_sub = + receiving.confirm_event().Subscribe([&](auto offset, auto) { + confirmed_offset = static_cast(offset); }); - - auto _2 = pc.MessageEvent().Subscribe( - [&](auto const& msg) { - auto& request_repeat = msg.message(); - repeat_requested.push_back(request_repeat.offset); + auto request_repeat_sub = + receiving.request_repeat_event().Subscribe([&](auto offset, auto) { + repeat_requested.push_back(static_cast(offset)); }); auto _3 = receiving.receive_event().Subscribe( - [&](DataBuffer&& data) { received_packet = std::move(data); }); + [&](DataBuffer&& data, auto) { received_packet = std::move(data); }); ap.Update(epoch); - receiving.ReceiveSend( - SafeStreamRingIndex{200}, - {_100_bytes_data, _100_bytes_data + sizeof(_100_bytes_data)}); + receiving.ReceiveSend(SafeStreamRingIndex{200}, ToDataBuffer(_100_bytes_data), + Now()); ap.Update(epoch += config.send_repeat_timeout); TEST_ASSERT(!repeat_requested.empty()); TEST_ASSERT_EQUAL(0, repeat_requested[0]); repeat_requested.clear(); - receiving.ReceiveSend( - SafeStreamRingIndex{400}, - {_100_bytes_data, _100_bytes_data + sizeof(_100_bytes_data)}); + receiving.ReceiveSend(SafeStreamRingIndex{400}, ToDataBuffer(_100_bytes_data), + Now()); ap.Update(epoch += config.send_repeat_timeout); TEST_ASSERT_EQUAL(2, repeat_requested.size()); @@ -192,6 +176,57 @@ void test_SafeStreamReceiveRequestRepeat() { TEST_ASSERT_EQUAL(0, repeat_requested.size()); } +void test_SafeStreamReceiveDuplications() { + auto epoch = TimePoint::clock::now(); + + auto ap = ActionProcessor{}; + auto ac = ActionContext{ap}; + + auto received_packet = DataBuffer{}; + auto confirmed_offset = std::uint16_t{12}; + + auto receiving = SafeStreamReceivingAction{ac, config}; + + auto confirm_sub = + receiving.confirm_event().Subscribe([&](auto offset, auto) { + confirmed_offset = static_cast(offset); + }); + auto request_repeat_sub = receiving.request_repeat_event().Subscribe( + [&](auto, auto) { TEST_FAIL_MESSAGE("Unexpected repeat"); }); + + auto _3 = receiving.receive_event().Subscribe([&](DataBuffer&& data, auto) { + received_packet.insert(std::end(received_packet), std::begin(data), + std::end(data)); + }); + + ap.Update(epoch); + + receiving.ReceiveSend(SafeStreamRingIndex{0}, ToDataBuffer(_100_bytes_data), + Now()); + + ap.Update(epoch += std::chrono::milliseconds{1}); + + TEST_ASSERT_EQUAL(100, received_packet.size()); + TEST_ASSERT_EQUAL(99, confirmed_offset); + + // duplication + receiving.ReceiveSend(SafeStreamRingIndex{0}, ToDataBuffer(_100_bytes_data), + Now()); + + ap.Update(epoch += std::chrono::milliseconds{1}); + // nothing changed + TEST_ASSERT_EQUAL(100, received_packet.size()); + TEST_ASSERT_EQUAL(99, confirmed_offset); + + // duplication offset, but more data (overlap) + receiving.ReceiveSend(SafeStreamRingIndex{0}, ToDataBuffer(_200_bytes_data), + Now()); + + ap.Update(epoch += config.send_repeat_timeout); + // added more data + TEST_ASSERT_EQUAL(200, received_packet.size()); + TEST_ASSERT_EQUAL(199, confirmed_offset); +} } // namespace ae::test_safe_stream_receiving int test_safe_stream_receiving() { @@ -199,5 +234,6 @@ int test_safe_stream_receiving() { UNITY_BEGIN(); RUN_TEST(ae::test_safe_stream_receiving::test_SafeStreamReceiveAFewPackets); RUN_TEST(ae::test_safe_stream_receiving::test_SafeStreamReceiveRequestRepeat); + RUN_TEST(ae::test_safe_stream_receiving::test_SafeStreamReceiveDuplications); return UNITY_END(); } diff --git a/tests/test-stream/safe-stream/test_safe_stream_sending.cpp b/tests/test-stream/safe-stream/test_safe_stream_sending.cpp index 080b6a73..44c84312 100644 --- a/tests/test-stream/safe-stream/test_safe_stream_sending.cpp +++ b/tests/test-stream/safe-stream/test_safe_stream_sending.cpp @@ -32,7 +32,8 @@ #include "aether/stream_api/safe_stream/safe_stream_sending.h" #include "aether/stream_api/safe_stream/safe_stream_types.h" -#include "aether/stream_api/safe_stream/safe_stream_api.h" + +#include "tests/test-stream/safe-stream/to_data_buffer.h" namespace ae::test_safe_stream_sending { @@ -65,17 +66,6 @@ constexpr char _1183_bytes_data[] = "woven into the fabric of the universe, eternal and unbroken, calling to " "those who dare to dream beyond the horizon of the possible."; -template -static auto ToVector(T const (&arr)[size]) { - return std::vector(reinterpret_cast(arr), - reinterpret_cast(arr + size)); -} - -template -static auto ToDataBuffer(T const (&arr)[size]) { - return ToVector(arr); -} - constexpr auto config = SafeStreamConfig{ 20 * 1024, (10 * 1024) - 1, @@ -91,24 +81,24 @@ void test_SafeStreamSendingFewChunks() { auto ap = ActionProcessor{}; auto ac = ActionContext(ap); - auto pc = ProtocolContext{}; - auto received_packet = DataBuffer{}; auto received_data = DataBuffer{}; auto received_offset = std::uint16_t{}; bool received_100 = false; bool received_200 = false; - auto sending = SafeStreamSendingAction(ac, pc, config); + auto sending = SafeStreamSendingAction(ac, config); sending.set_max_data_size(100); - auto _0 = sending.write_data_event().Subscribe([&](auto, auto data, auto) { - received_packet = std::move(data); - auto api_parser = ae::ApiParser(pc, received_packet); - auto mid = api_parser.Extract(); - auto send = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(send.data), - std::end(send.data)); - received_offset = send.offset; - }); + auto send_sub = + sending.send_event().Subscribe([&](auto offset, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = static_cast(offset); + }); + + auto repeat_sub = + sending.repeat_event().Subscribe([&](auto, auto, auto&&, auto) { + TEST_FAIL_MESSAGE("Unexpected repeat"); + }); auto send_100 = sending.SendData(ToDataBuffer(_100_bytes_data)); ap.Update(epoch + std::chrono::milliseconds{1}); @@ -149,23 +139,23 @@ void test_SafeStreamSendingWaitConfirm() { auto ap = ActionProcessor{}; auto ac = ActionContext(ap); - auto pc = ProtocolContext{}; - auto received_packet = DataBuffer{}; auto received_data = DataBuffer{}; auto received_offset = std::uint16_t{}; - auto sending = SafeStreamSendingAction{ac, pc, config}; + auto sending = SafeStreamSendingAction{ac, config}; sending.set_max_data_size(100); - auto _0 = sending.write_data_event().Subscribe([&](auto, auto data, auto) { - received_packet = std::move(data); - auto api_parser = ae::ApiParser(pc, received_packet); - auto mid = api_parser.Extract(); - auto send = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(send.data), - std::end(send.data)); - received_offset = send.offset; - }); + auto send_sub = + sending.send_event().Subscribe([&](auto offset, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = static_cast(offset); + }); + + auto repeat_sub = + sending.repeat_event().Subscribe([&](auto, auto, auto&&, auto) { + TEST_FAIL_MESSAGE("Unexpected repeat"); + }); sending.SendData(ToDataBuffer(_100_bytes_data)); sending.SendData(ToDataBuffer(_200_bytes_data)); @@ -190,40 +180,27 @@ void test_SafeStreamSendingRepeat() { auto ap = ActionProcessor{}; auto ac = ActionContext(ap); - auto pc = ProtocolContext{}; - auto received_packet = DataBuffer{}; auto received_data = DataBuffer{}; auto received_offset = std::uint16_t{}; auto sending_error = bool{}; - auto sending = SafeStreamSendingAction{ac, pc, config}; + auto sending = SafeStreamSendingAction{ac, config}; sending.set_max_data_size(100); - auto _0 = sending.write_data_event().Subscribe([&](auto, auto data, auto) { - received_packet = std::move(data); - auto api_parser = ae::ApiParser(pc, received_packet); - auto mid = api_parser.Extract(); - switch (mid) { - case SafeStreamApi::Send::kMessageCode: { - auto send = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(send.data), - std::end(send.data)); - received_offset = send.offset; - break; - } - case SafeStreamApi::Repeat::kMessageCode: { - auto repeat = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(repeat.data), - std::end(repeat.data)); - received_offset = repeat.offset; - break; - } - default: - TEST_ASSERT(false); - break; - } - }); + auto send_sub = + sending.send_event().Subscribe([&](auto offset, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = static_cast(offset); + }); + + auto repeat_sub = sending.repeat_event().Subscribe( + [&](auto offset, auto /* repeat_count*/, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = static_cast(offset); + }); auto send_action = sending.SendData(ToDataBuffer(_100_bytes_data)); @@ -259,39 +236,26 @@ void test_SafeStreamSendingRepeatRequest() { auto ap = ActionProcessor{}; auto ac = ActionContext(ap); - auto pc = ProtocolContext{}; - auto received_packet = DataBuffer{}; auto received_data = DataBuffer{}; auto received_offset = std::uint16_t{}; auto sending_error = bool{}; - auto sending = SafeStreamSendingAction{ac, pc, config}; + auto sending = SafeStreamSendingAction{ac, config}; sending.set_max_data_size(100); - auto _ = sending.write_data_event().Subscribe([&](auto, auto data, auto) { - received_packet = std::move(data); - auto api_parser = ae::ApiParser(pc, received_packet); - auto mid = api_parser.Extract(); - switch (mid) { - case SafeStreamApi::Send::kMessageCode: { - auto send = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(send.data), - std::end(send.data)); - received_offset = send.offset; - break; - } - case SafeStreamApi::Repeat::kMessageCode: { - auto repeat = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(repeat.data), - std::end(repeat.data)); - received_offset = repeat.offset; - break; - } - default: - TEST_ASSERT(false); - break; - } - }); + auto send_sub = + sending.send_event().Subscribe([&](auto offset, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = static_cast(offset); + }); + + auto repeat_sub = sending.repeat_event().Subscribe( + [&](auto offset, auto /* repeat_count*/, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = static_cast(offset); + }); auto send_action = sending.SendData(ToDataBuffer(_100_bytes_data)); auto _1 = @@ -322,25 +286,18 @@ void test_SafeStreamSendingOverBufferCapacity() { auto ap = ActionProcessor{}; auto ac = ActionContext(ap); - auto pc = ProtocolContext{}; - auto received_packet = DataBuffer{}; auto received_data = DataBuffer{}; auto received_offset = SafeStreamRingIndex{}; auto sending_error = bool{}; - auto sending = SafeStreamSendingAction{ac, pc, config}; + auto sending = SafeStreamSendingAction{ac, config}; sending.set_max_data_size(500); - auto _ = - sending.write_data_event().Subscribe([&](auto offset, auto data, auto) { - auto api_parser = ae::ApiParser(pc, data); - auto mid = api_parser.Extract(); - TEST_ASSERT_EQUAL(SafeStreamApi::Send::kMessageCode, mid); - - auto send = api_parser.Extract(); - received_data.insert(std::end(received_data), std::begin(send.data), - std::end(send.data)); - received_offset = SafeStreamRingIndex{send.offset}; + auto send_sub = + sending.send_event().Subscribe([&](auto offset, auto&& data, auto) { + received_data.insert(std::end(received_data), std::begin(data), + std::end(data)); + received_offset = offset; }); for (auto i = 0; i < 15; i++) { diff --git a/tests/test-stream/safe-stream/to_data_buffer.h b/tests/test-stream/safe-stream/to_data_buffer.h new file mode 100644 index 00000000..ab1a7d49 --- /dev/null +++ b/tests/test-stream/safe-stream/to_data_buffer.h @@ -0,0 +1,39 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TESTS_TEST_STREAM_SAFE_STREAM_TO_DATA_BUFFER_H_ +#define TESTS_TEST_STREAM_SAFE_STREAM_TO_DATA_BUFFER_H_ + +#include +#include +#include + +namespace ae { + +template +static auto ToVector(T const (&arr)[size]) { + return std::vector(reinterpret_cast(arr), + reinterpret_cast(arr + size)); +} + +template +static auto ToDataBuffer(T const (&arr)[size]) { + return ToVector(arr); +} + +} // namespace ae + +#endif // TESTS_TEST_STREAM_SAFE_STREAM_TO_DATA_BUFFER_H_