From 56975b2266fa0537b774a0c3ea4ad062c4b60543 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 16:32:53 +0500 Subject: [PATCH] make ping receive result and send duration to next ping --- aether/ae_actions/ping.cpp | 43 +++++++++++++++---- aether/ae_actions/ping.h | 5 ++- .../methods/work_server_api/authorized_api.h | 6 ++- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/aether/ae_actions/ping.cpp b/aether/ae_actions/ping.cpp index 66cbb0e0..84d6e8c7 100644 --- a/aether/ae_actions/ping.cpp +++ b/aether/ae_actions/ping.cpp @@ -33,9 +33,15 @@ Ping::Ping(ActionContext action_context, Server::ptr const& server, server_stream_{&server_stream}, ping_interval_{ping_interval}, read_client_safe_api_gate_{protocol_context_, client_safe_api_}, - state_{State::kSendPing}, - state_changed_subscription_{state_.changed_event().Subscribe( - [this](auto) { Action::Trigger(); })} { + state_{State::kWaitLink}, + state_changed_sub_{state_.changed_event().Subscribe( + [this](auto) { Action::Trigger(); })}, + stream_changed_sub_{ + server_stream_->in().gate_update_event().Subscribe([this]() { + if (read_client_safe_api_gate_.stream_info().is_linked) { + state_ = State::kSendPing; + } + })} { AE_TELE_INFO(kPing, "Ping action created, interval {:%S}", ping_interval); Tie(read_client_safe_api_gate_, *server_stream_); } @@ -45,6 +51,8 @@ Ping::~Ping() = default; TimePoint Ping::Update(TimePoint current_time) { if (state_.changed()) { switch (state_.Acquire()) { + case State::kWaitLink: + break; case State::kSendPing: SendPing(current_time); break; @@ -72,22 +80,39 @@ TimePoint Ping::Update(TimePoint current_time) { void Ping::SendPing(TimePoint current_time) { AE_TELE_DEBUG(kPingSend, "Send ping"); last_ping_time_ = current_time; + auto request_id = RequestId::GenRequestId(); auto packet = PacketBuilder{ - protocol_context_, PackMessage{AuthorizedApi{}, AuthorizedApi::Ping{}}}; + protocol_context_, + PackMessage{AuthorizedApi{}, + AuthorizedApi::Ping{ + {}, + request_id, + static_cast( + std::chrono::duration_cast( + ping_interval_) + .count())}}}; auto write_action = server_stream_->in().Write(std::move(packet), current_time); + write_subscription_ = write_action->SubscribeOnError([this](auto const&) { AE_TELE_ERROR(kPingWriteError, "Ping write error"); state_ = State::kError; }); - // TODO: add wait response - // state_ = State::kWaitResponse; - state_ = State::kWaitInterval; + + // Wait for response + SendResult::OnResponse(protocol_context_, request_id, + [&](ApiParser&) { PingResponse(); }); + + state_ = State::kWaitResponse; } void Ping::PingResponse() { - // TODO: handle ping response - // calculate server and channel statistics + auto current_time = Now(); + auto ping_duration = + std::chrono::duration_cast(current_time - last_ping_time_); + + AE_TELED_DEBUG("Ping received by {:%S} s", ping_duration); + state_ = State::kWaitInterval; } } // namespace ae diff --git a/aether/ae_actions/ping.h b/aether/ae_actions/ping.h index cc55e874..0e4c29ee 100644 --- a/aether/ae_actions/ping.h +++ b/aether/ae_actions/ping.h @@ -34,6 +34,7 @@ namespace ae { class Ping : public Action { enum class State : std::uint8_t { + kWaitLink, kSendPing, kWaitResponse, kWaitInterval, @@ -65,9 +66,9 @@ class Ping : public Action { TimePoint last_ping_time_; Subscription write_subscription_; - Subscription ping_response_subscription_; StateMachine state_; - Subscription state_changed_subscription_; + Subscription state_changed_sub_; + Subscription stream_changed_sub_; }; } // namespace ae diff --git a/aether/methods/work_server_api/authorized_api.h b/aether/methods/work_server_api/authorized_api.h index 067355e3..fd82b3e3 100644 --- a/aether/methods/work_server_api/authorized_api.h +++ b/aether/methods/work_server_api/authorized_api.h @@ -33,7 +33,11 @@ class AuthorizedApi : public ApiClass { // Just ping the server to finalize authorization and stream struct Ping : public Message { static constexpr auto kMessageCode = 6; - AE_REFLECT() + + AE_REFLECT_MEMBERS(request_id, next_ping_duration) + + RequestId request_id; + std::uint64_t next_ping_duration; }; struct OpenStreamToClient : public Message {