From 4a064cc1f8937a1d87d8308d37bcb04e303a7537 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Wed, 2 Apr 2025 18:10:09 +0500 Subject: [PATCH 1/2] add missed timer action stop implementation --- aether/actions/timer_action.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aether/actions/timer_action.cpp b/aether/actions/timer_action.cpp index 37107419..983a8960 100644 --- a/aether/actions/timer_action.cpp +++ b/aether/actions/timer_action.cpp @@ -63,6 +63,6 @@ TimePoint TimerAction::Update(TimePoint current_time) { return current_time; } -void Stop(); +void TimerAction::Stop() { state_ = State::kStopped; } } // namespace ae From 691670bf0da9a24bc3470e2e948778e8c9d0024b Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Wed, 2 Apr 2025 18:11:01 +0500 Subject: [PATCH 2/2] add calculation of connection time --- .../server/server_channel_stream.cpp | 33 ++++++++++++++++--- .../transport/server/server_channel_stream.h | 12 +++++-- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/aether/transport/server/server_channel_stream.cpp b/aether/transport/server/server_channel_stream.cpp index 7739b6b2..8cd21b5e 100644 --- a/aether/transport/server/server_channel_stream.cpp +++ b/aether/transport/server/server_channel_stream.cpp @@ -48,11 +48,11 @@ static constexpr auto kBufferGateCapacity = std::size_t{100}; ServerChannelStream::ServerChannelStream(ObjPtr const& aether, Adapter::ptr const& adapter, - Server::ptr server, - Channel::ptr channel) + Server::ptr const& server, + Channel::ptr const& channel) : action_context_{*aether->action_processor}, - server_{std::move(server)}, - channel_{std::move(channel)}, + server_{server}, + channel_{channel}, buffer_gate_{action_context_, kBufferGateCapacity}, connection_action_{std::visit( [&](auto const& address) { @@ -60,16 +60,37 @@ ServerChannelStream::ServerChannelStream(ObjPtr const& aether, ActionContext{*aether->action_processor}, aether, adapter, address); }, - channel_->address)} { + channel->address)}, + connection_start_time_{Now()}, + connection_timer_{std::in_place, ActionContext{*aether->action_processor}, + channel->expected_connection_time()} { connection_success_ = connection_action_->SubscribeOnResult( [this](auto& action) { OnConnected(action); }), connection_failed_ = connection_action_->SubscribeOnError( [this](auto const&) { OnConnectedFailed(); }), connection_finished_ = connection_action_->FinishedEvent().Subscribe( [this]() { connection_action_.reset(); }); + + connection_timeout_ = connection_timer_->SubscribeOnResult( + [this](auto const&) { OnConnectedFailed(); }); + connection_timer_finished_ = connection_timer_->FinishedEvent().Subscribe( + [this]() { connection_timer_.reset(); }); } void ServerChannelStream::OnConnected(ChannelConnectionAction& connection) { + auto channel_ptr = channel_.Lock(); + assert(channel_ptr); + + auto connection_time = + std::chrono::duration_cast(Now() - connection_start_time_); + channel_ptr->AddConnectionTime(std::move(connection_time)); + + if (!connection_timer_) { + // probably timeout + return; + } + connection_timer_->Stop(); + transport_ = connection.transport(); connection_error_ = transport_->ConnectionError() @@ -82,6 +103,8 @@ void ServerChannelStream::OnConnected(ChannelConnectionAction& connection) { void ServerChannelStream::OnConnectedFailed() { AE_TELED_ERROR("ServerChannelStream:OnConnectedFailed"); + connection_timeout_.Reset(); + connection_failed_.Reset(); buffer_gate_.Unlink(); } diff --git a/aether/transport/server/server_channel_stream.h b/aether/transport/server/server_channel_stream.h index 307aa99f..2b177d3e 100644 --- a/aether/transport/server/server_channel_stream.h +++ b/aether/transport/server/server_channel_stream.h @@ -25,6 +25,7 @@ #include "aether/ptr/ptr.h" #include "aether/obj/obj_ptr.h" +#include "aether/actions/timer_action.h" #include "aether/actions/action_context.h" #include "aether/events/multi_subscription.h" @@ -43,7 +44,7 @@ class ChannelConnectionAction; class ServerChannelStream final : public ByteStream { public: ServerChannelStream(ObjPtr const& aether, Adapter::ptr const& adapter, - Server::ptr server, Channel::ptr channel); + Server::ptr const& server, Channel::ptr const& channel); AE_CLASS_NO_COPY_MOVE(ServerChannelStream) @@ -56,18 +57,23 @@ class ServerChannelStream final : public ByteStream { void OnConnectedFailed(); ActionContext action_context_; - Server::ptr server_; - Channel::ptr channel_; + PtrView server_; + PtrView channel_; BufferGate buffer_gate_; std::unique_ptr transport_; std::optional transport_write_gate_; std::unique_ptr connection_action_; + TimePoint connection_start_time_; + std::optional connection_timer_; + Subscription connection_success_; Subscription connection_failed_; Subscription connection_finished_; Subscription connection_error_; + Subscription connection_timeout_; + Subscription connection_timer_finished_; }; } // namespace ae