Skip to content

Commit 9627b84

Browse files
Merge pull request #109 from aethernetio/108-safestream-handle-duplicated-packets-with-overlapped-offset
108 safestream handle duplicated packets with overlapped offset
2 parents 01f3b67 + 6458b3e commit 9627b84

10 files changed

Lines changed: 434 additions & 380 deletions

File tree

aether/stream_api/safe_stream.cpp

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <cstddef>
2020
#include <utility>
2121

22+
#include "aether/api_protocol/packet_builder.h"
23+
#include "aether/stream_api/safe_stream/safe_stream_api.h"
2224
#include "aether/stream_api/safe_stream/sending_data_action.h"
2325

2426
namespace ae {
@@ -70,7 +72,8 @@ ActionView<StreamWriteAction> SafeStream::SafeStreamInGate::Write(
7072
return action;
7173
}
7274

73-
void SafeStream::SafeStreamInGate::WriteOut(DataBuffer&& buffer) {
75+
void SafeStream::SafeStreamInGate::WriteOut(DataBuffer&& buffer,
76+
TimePoint /* current_time */) {
7477
out_data_event_.Emit(buffer);
7578
}
7679

@@ -122,18 +125,24 @@ void SafeStream::SafeStreamOutGate::LinkOut(OutGate& gate) {
122125

123126
SafeStream::SafeStream(ActionContext action_context, SafeStreamConfig config)
124127
: action_context_{action_context},
125-
safe_stream_sending_{action_context_, protocol_context_, config},
126-
safe_stream_receiving_{action_context_, protocol_context_, config},
128+
safe_stream_sending_{action_context_, config},
129+
safe_stream_receiving_{action_context_, config},
127130
in_{action_context_, safe_stream_sending_, config.max_data_size},
128131
out_{protocol_context_} {
129-
subscriptions_.Push(safe_stream_sending_.write_data_event().Subscribe(
130-
*this, MethodPtr<&SafeStream::OnDataWrite>{}),
132+
subscriptions_.Push(safe_stream_sending_.send_event().Subscribe(
133+
*this, MethodPtr<&SafeStream::OnSendEvent>{}),
134+
safe_stream_sending_.repeat_event().Subscribe(
135+
*this, MethodPtr<&SafeStream::OnRepeatEvent>{}),
131136
safe_stream_receiving_.receive_event().Subscribe(
132137
in_, MethodPtr<&SafeStreamInGate::WriteOut>{}));
133138

134139
subscriptions_.Push(
135-
safe_stream_receiving_.send_data_event().Subscribe(
136-
*this, MethodPtr<&SafeStream::OnDataReaderSend>{}),
140+
safe_stream_receiving_.confirm_event().Subscribe(
141+
*this, MethodPtr<&SafeStream::OnConfirmEvent>{}),
142+
safe_stream_receiving_.request_repeat_event().Subscribe(
143+
*this, MethodPtr<&SafeStream::OnRequestRepeatEvent>{}));
144+
145+
subscriptions_.Push(
137146
protocol_context_.MessageEvent<SafeStreamApi::Confirm>().Subscribe(
138147
*this, MethodPtr<&SafeStream::Confirm>{}),
139148
protocol_context_.MessageEvent<SafeStreamApi::RequestRepeat>().Subscribe(
@@ -150,9 +159,16 @@ ByteGate::Base& SafeStream::in() { return in_; }
150159

151160
void SafeStream::LinkOut(OutGate& gate) { out_.LinkOut(gate); }
152161

153-
void SafeStream::OnDataWrite(SafeStreamRingIndex offset, DataBuffer&& data,
162+
void SafeStream::OnSendEvent(SafeStreamRingIndex offset, DataBuffer&& data,
154163
TimePoint current_time) {
155-
auto write_action = out_.Write(std::move(data), current_time);
164+
auto packet = PacketBuilder{
165+
protocol_context_,
166+
PackMessage{
167+
SafeStreamApi{},
168+
SafeStreamApi::Send{
169+
{}, static_cast<std::uint16_t>(offset), std::move(data)}}};
170+
171+
auto write_action = out_.Write(std::move(packet), current_time);
156172

157173
subscriptions_.Push(
158174
write_action->SubscribeOnResult([this, offset](auto const& /* action */) {
@@ -168,8 +184,51 @@ void SafeStream::OnDataWrite(SafeStreamRingIndex offset, DataBuffer&& data,
168184
}));
169185
}
170186

171-
void SafeStream::OnDataReaderSend(DataBuffer&& data, TimePoint current_time) {
172-
out_.Write(std::move(data), current_time);
187+
void SafeStream::OnRepeatEvent(SafeStreamRingIndex offset,
188+
std::uint16_t repeat_count, DataBuffer&& data,
189+
TimePoint current_time) {
190+
auto packet = PacketBuilder{
191+
protocol_context_,
192+
PackMessage{SafeStreamApi{},
193+
SafeStreamApi::Repeat{{},
194+
repeat_count,
195+
static_cast<std::uint16_t>(offset),
196+
std::move(data)}}};
197+
198+
auto write_action = out_.Write(std::move(packet), current_time);
199+
200+
subscriptions_.Push(
201+
write_action->SubscribeOnResult([this, offset](auto const& /* action */) {
202+
safe_stream_sending_.ReportWriteSuccess(offset);
203+
}));
204+
subscriptions_.Push(
205+
write_action->SubscribeOnError([this, offset](auto const& /* action */) {
206+
safe_stream_sending_.ReportWriteError(offset);
207+
}));
208+
subscriptions_.Push(
209+
write_action->SubscribeOnStop([this, offset](auto const& /* action */) {
210+
safe_stream_sending_.ReportWriteStopped(offset);
211+
}));
212+
}
213+
214+
void SafeStream::OnConfirmEvent(SafeStreamRingIndex offset,
215+
TimePoint current_time) {
216+
auto packet = PacketBuilder{
217+
protocol_context_,
218+
PackMessage{
219+
SafeStreamApi{},
220+
SafeStreamApi::Confirm{{}, static_cast<std::uint16_t>(offset)}}};
221+
out_.Write(std::move(packet), current_time);
222+
}
223+
224+
void SafeStream::OnRequestRepeatEvent(SafeStreamRingIndex offset,
225+
TimePoint current_time) {
226+
auto packet =
227+
PacketBuilder{protocol_context_,
228+
PackMessage{SafeStreamApi{},
229+
SafeStreamApi::RequestRepeat{
230+
{}, static_cast<std::uint16_t>(offset)}}};
231+
out_.Write(std::move(packet), current_time);
173232
}
174233

175234
void SafeStream::Confirm(MessageEventData<SafeStreamApi::Confirm> const& msg) {
@@ -185,14 +244,14 @@ void SafeStream::RequestRepeatSend(
185244
void SafeStream::ReceiveSend(MessageEventData<SafeStreamApi::Send> const& msg) {
186245
safe_stream_receiving_.ReceiveSend(
187246
SafeStreamRingIndex{msg.message().offset},
188-
std::move(const_cast<SafeStreamApi::Send&>(msg.message()).data));
247+
std::move(const_cast<SafeStreamApi::Send&>(msg.message()).data), Now());
189248
}
190249

191250
void SafeStream::ReceiveRepeat(
192251
MessageEventData<SafeStreamApi::Repeat> const& msg) {
193252
safe_stream_receiving_.ReceiveRepeat(
194253
SafeStreamRingIndex{msg.message().offset}, msg.message().repeat_count,
195-
std::move(const_cast<SafeStreamApi::Repeat&>(msg.message()).data));
254+
std::move(const_cast<SafeStreamApi::Repeat&>(msg.message()).data), Now());
196255
}
197256

198257
} // namespace ae

aether/stream_api/safe_stream.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class SafeStream final : public ByteStream {
5858
ActionView<StreamWriteAction> Write(DataBuffer &&buffer,
5959
TimePoint current_time) override;
6060

61-
void WriteOut(DataBuffer &&buffer);
61+
void WriteOut(DataBuffer &&buffer, TimePoint current_time);
6262

6363
void LinkOut(OutGate &gate) override;
6464

@@ -92,10 +92,13 @@ class SafeStream final : public ByteStream {
9292
void LinkOut(OutGate &gate) override;
9393

9494
private:
95-
void OnDataWrite(SafeStreamRingIndex offset, DataBuffer &&data,
95+
void OnSendEvent(SafeStreamRingIndex offset, DataBuffer &&data,
9696
TimePoint current_time);
97+
void OnRepeatEvent(SafeStreamRingIndex offset, std::uint16_t repeat_count,
98+
DataBuffer &&data, TimePoint current_time);
9799

98-
void OnDataReaderSend(DataBuffer &&data, TimePoint current_time);
100+
void OnConfirmEvent(SafeStreamRingIndex offset, TimePoint current_time);
101+
void OnRequestRepeatEvent(SafeStreamRingIndex offset, TimePoint current_time);
99102

100103
void Confirm(MessageEventData<SafeStreamApi::Confirm> const &msg);
101104
void RequestRepeatSend(

0 commit comments

Comments
 (0)