Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 72 additions & 13 deletions aether/stream_api/safe_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <cstddef>
#include <utility>

#include "aether/api_protocol/packet_builder.h"
#include "aether/stream_api/safe_stream/safe_stream_api.h"
#include "aether/stream_api/safe_stream/sending_data_action.h"

namespace ae {
Expand Down Expand Up @@ -70,7 +72,8 @@ ActionView<StreamWriteAction> SafeStream::SafeStreamInGate::Write(
return action;
}

void SafeStream::SafeStreamInGate::WriteOut(DataBuffer&& buffer) {
void SafeStream::SafeStreamInGate::WriteOut(DataBuffer&& buffer,
TimePoint /* current_time */) {
out_data_event_.Emit(buffer);
}

Expand Down Expand Up @@ -122,18 +125,24 @@ void SafeStream::SafeStreamOutGate::LinkOut(OutGate& gate) {

SafeStream::SafeStream(ActionContext action_context, SafeStreamConfig config)
: action_context_{action_context},
safe_stream_sending_{action_context_, protocol_context_, config},
safe_stream_receiving_{action_context_, protocol_context_, config},
safe_stream_sending_{action_context_, config},
safe_stream_receiving_{action_context_, config},
in_{action_context_, safe_stream_sending_, config.max_data_size},
out_{protocol_context_} {
subscriptions_.Push(safe_stream_sending_.write_data_event().Subscribe(
*this, MethodPtr<&SafeStream::OnDataWrite>{}),
subscriptions_.Push(safe_stream_sending_.send_event().Subscribe(
*this, MethodPtr<&SafeStream::OnSendEvent>{}),
safe_stream_sending_.repeat_event().Subscribe(
*this, MethodPtr<&SafeStream::OnRepeatEvent>{}),
safe_stream_receiving_.receive_event().Subscribe(
in_, MethodPtr<&SafeStreamInGate::WriteOut>{}));

subscriptions_.Push(
safe_stream_receiving_.send_data_event().Subscribe(
*this, MethodPtr<&SafeStream::OnDataReaderSend>{}),
safe_stream_receiving_.confirm_event().Subscribe(
*this, MethodPtr<&SafeStream::OnConfirmEvent>{}),
safe_stream_receiving_.request_repeat_event().Subscribe(
*this, MethodPtr<&SafeStream::OnRequestRepeatEvent>{}));

subscriptions_.Push(
protocol_context_.MessageEvent<SafeStreamApi::Confirm>().Subscribe(
*this, MethodPtr<&SafeStream::Confirm>{}),
protocol_context_.MessageEvent<SafeStreamApi::RequestRepeat>().Subscribe(
Expand All @@ -150,9 +159,16 @@ ByteGate::Base& SafeStream::in() { return in_; }

void SafeStream::LinkOut(OutGate& gate) { out_.LinkOut(gate); }

void SafeStream::OnDataWrite(SafeStreamRingIndex offset, DataBuffer&& data,
void SafeStream::OnSendEvent(SafeStreamRingIndex offset, DataBuffer&& data,
TimePoint current_time) {
auto write_action = out_.Write(std::move(data), current_time);
auto packet = PacketBuilder{
protocol_context_,
PackMessage{
SafeStreamApi{},
SafeStreamApi::Send{
{}, static_cast<std::uint16_t>(offset), std::move(data)}}};

auto write_action = out_.Write(std::move(packet), current_time);

subscriptions_.Push(
write_action->SubscribeOnResult([this, offset](auto const& /* action */) {
Expand All @@ -168,8 +184,51 @@ void SafeStream::OnDataWrite(SafeStreamRingIndex offset, DataBuffer&& data,
}));
}

void SafeStream::OnDataReaderSend(DataBuffer&& data, TimePoint current_time) {
out_.Write(std::move(data), current_time);
void SafeStream::OnRepeatEvent(SafeStreamRingIndex offset,
std::uint16_t repeat_count, DataBuffer&& data,
TimePoint current_time) {
auto packet = PacketBuilder{
protocol_context_,
PackMessage{SafeStreamApi{},
SafeStreamApi::Repeat{{},
repeat_count,
static_cast<std::uint16_t>(offset),
std::move(data)}}};

auto write_action = out_.Write(std::move(packet), current_time);

subscriptions_.Push(
write_action->SubscribeOnResult([this, offset](auto const& /* action */) {
safe_stream_sending_.ReportWriteSuccess(offset);
}));
subscriptions_.Push(
write_action->SubscribeOnError([this, offset](auto const& /* action */) {
safe_stream_sending_.ReportWriteError(offset);
}));
subscriptions_.Push(
write_action->SubscribeOnStop([this, offset](auto const& /* action */) {
safe_stream_sending_.ReportWriteStopped(offset);
}));
}

void SafeStream::OnConfirmEvent(SafeStreamRingIndex offset,
TimePoint current_time) {
auto packet = PacketBuilder{
protocol_context_,
PackMessage{
SafeStreamApi{},
SafeStreamApi::Confirm{{}, static_cast<std::uint16_t>(offset)}}};
out_.Write(std::move(packet), current_time);
}

void SafeStream::OnRequestRepeatEvent(SafeStreamRingIndex offset,
TimePoint current_time) {
auto packet =
PacketBuilder{protocol_context_,
PackMessage{SafeStreamApi{},
SafeStreamApi::RequestRepeat{
{}, static_cast<std::uint16_t>(offset)}}};
out_.Write(std::move(packet), current_time);
}

void SafeStream::Confirm(MessageEventData<SafeStreamApi::Confirm> const& msg) {
Expand All @@ -185,14 +244,14 @@ void SafeStream::RequestRepeatSend(
void SafeStream::ReceiveSend(MessageEventData<SafeStreamApi::Send> const& msg) {
safe_stream_receiving_.ReceiveSend(
SafeStreamRingIndex{msg.message().offset},
std::move(const_cast<SafeStreamApi::Send&>(msg.message()).data));
std::move(const_cast<SafeStreamApi::Send&>(msg.message()).data), Now());
}

void SafeStream::ReceiveRepeat(
MessageEventData<SafeStreamApi::Repeat> const& msg) {
safe_stream_receiving_.ReceiveRepeat(
SafeStreamRingIndex{msg.message().offset}, msg.message().repeat_count,
std::move(const_cast<SafeStreamApi::Repeat&>(msg.message()).data));
std::move(const_cast<SafeStreamApi::Repeat&>(msg.message()).data), Now());
}

} // namespace ae
9 changes: 6 additions & 3 deletions aether/stream_api/safe_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class SafeStream final : public ByteStream {
ActionView<StreamWriteAction> Write(DataBuffer &&buffer,
TimePoint current_time) override;

void WriteOut(DataBuffer &&buffer);
void WriteOut(DataBuffer &&buffer, TimePoint current_time);

void LinkOut(OutGate &gate) override;

Expand Down Expand Up @@ -92,10 +92,13 @@ class SafeStream final : public ByteStream {
void LinkOut(OutGate &gate) override;

private:
void OnDataWrite(SafeStreamRingIndex offset, DataBuffer &&data,
void OnSendEvent(SafeStreamRingIndex offset, DataBuffer &&data,
TimePoint current_time);
void OnRepeatEvent(SafeStreamRingIndex offset, std::uint16_t repeat_count,
DataBuffer &&data, TimePoint current_time);

void OnDataReaderSend(DataBuffer &&data, TimePoint current_time);
void OnConfirmEvent(SafeStreamRingIndex offset, TimePoint current_time);
void OnRequestRepeatEvent(SafeStreamRingIndex offset, TimePoint current_time);

void Confirm(MessageEventData<SafeStreamApi::Confirm> const &msg);
void RequestRepeatSend(
Expand Down
Loading
Loading