1717#include " aether/ae_actions/ping.h"
1818
1919#include < utility>
20+ #include < optional>
2021
2122#include " aether/api_protocol/packet_builder.h"
2223#include " aether/methods/work_server_api/authorized_api.h"
@@ -33,6 +34,7 @@ Ping::Ping(ActionContext action_context, Server::ptr const& server,
3334 server_stream_{&server_stream},
3435 ping_interval_{ping_interval},
3536 read_client_safe_api_gate_{protocol_context_, client_safe_api_},
37+ repeat_count_{},
3638 state_{State::kWaitLink },
3739 state_changed_sub_{state_.changed_event ().Subscribe (
3840 [this ](auto ) { Action::Trigger (); })},
@@ -56,8 +58,7 @@ TimePoint Ping::Update(TimePoint current_time) {
5658 case State::kSendPing :
5759 SendPing (current_time);
5860 break ;
59- case State::kWaitResponse : // TODO: response timeout and response
60- // statistics
61+ case State::kWaitResponse :
6162 case State::kWaitInterval :
6263 break ;
6364 case State::kError :
@@ -66,21 +67,20 @@ TimePoint Ping::Update(TimePoint current_time) {
6667 }
6768 }
6869
70+ if (state_.get () == State::kWaitResponse ) {
71+ return WaitResponse (current_time);
72+ }
6973 if (state_.get () == State::kWaitInterval ) {
70- if ((last_ping_time_ + ping_interval_) <= current_time) {
71- state_ = State::kSendPing ;
72- } else {
73- return last_ping_time_ + ping_interval_;
74- }
74+ return WaitInterval (current_time);
7575 }
7676
7777 return current_time;
7878}
7979
8080void Ping::SendPing (TimePoint current_time) {
8181 AE_TELE_DEBUG (kPingSend , " Send ping" );
82- last_ping_time_ = current_time;
8382 auto request_id = RequestId::GenRequestId ();
83+ ping_times_.push (std::make_pair (request_id, current_time));
8484 auto packet = PacketBuilder{
8585 protocol_context_,
8686 PackMessage{AuthorizedApi{},
@@ -100,18 +100,69 @@ void Ping::SendPing(TimePoint current_time) {
100100 });
101101
102102 // Wait for response
103- SendResult::OnResponse (protocol_context_, request_id,
104- [&](ApiParser&) { PingResponse (); });
103+ SendResult::OnResponse (
104+ protocol_context_, request_id,
105+ [&, req_id{request_id}](ApiParser&) { PingResponse (req_id); });
105106
106107 state_ = State::kWaitResponse ;
107108}
108109
109- void Ping::PingResponse () {
110+ TimePoint Ping::WaitInterval (TimePoint current_time) {
111+ auto const & ping_time = ping_times_.back ().second ;
112+ if ((ping_time + ping_interval_) > current_time) {
113+ return ping_time + ping_interval_;
114+ }
115+ state_ = State::kSendPing ;
116+ return current_time;
117+ }
118+
119+ TimePoint Ping::WaitResponse (TimePoint current_time) {
120+ auto channel_ptr = channel_.Lock ();
121+ assert (channel_ptr);
122+ auto timeout = channel_ptr->expected_ping_time ();
123+
124+ auto const & ping_time = ping_times_.back ().second ;
125+ if ((ping_time + timeout) > current_time) {
126+ return ping_time + timeout;
127+ }
128+ // timeout
129+ AE_TELE_INFO (kPingTimeout , " Timeout is {:%S}" , timeout);
130+ if (repeat_count_ >= kMaxRepeatPingCount ) {
131+ AE_TELE_ERROR (kPingTimeoutError , " Ping repeat count exceeded" );
132+ state_ = State::kError ;
133+ } else {
134+ repeat_count_++;
135+ state_ = State::kSendPing ;
136+ }
137+
138+ return current_time;
139+ }
140+
141+ void Ping::PingResponse (RequestId request_id) {
142+ auto request_time = [&]() -> std::optional<TimePoint> {
143+ for (auto const & [req_id, time] : ping_times_) {
144+ if (req_id == request_id) {
145+ return time;
146+ }
147+ }
148+ return std::nullopt ;
149+ }();
150+
151+ if (!request_time) {
152+ AE_TELED_DEBUG (" Got lost, or not our ping request" );
153+ return ;
154+ }
155+
156+ repeat_count_ = 0 ;
110157 auto current_time = Now ();
111158 auto ping_duration =
112- std::chrono::duration_cast<Duration>(current_time - last_ping_time_ );
159+ std::chrono::duration_cast<Duration>(current_time - *request_time );
113160
114161 AE_TELED_DEBUG (" Ping received by {:%S} s" , ping_duration);
162+ auto channel_ptr = channel_.Lock ();
163+ assert (channel_ptr);
164+ channel_ptr->AddPingTime (ping_duration);
165+
115166 state_ = State::kWaitInterval ;
116167}
117168
0 commit comments