Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions aether/ae_actions/ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ Ping::Ping(ActionContext action_context, Server::ptr const& server,
server_stream_{&server_stream},
ping_interval_{ping_interval},
read_client_safe_api_gate_{protocol_context_, client_safe_api_},
state_{State::kSendPing},
state_changed_subscription_{state_.changed_event().Subscribe(
[this](auto) { Action::Trigger(); })} {
state_{State::kWaitLink},
state_changed_sub_{state_.changed_event().Subscribe(
[this](auto) { Action::Trigger(); })},
stream_changed_sub_{
server_stream_->in().gate_update_event().Subscribe([this]() {
if (read_client_safe_api_gate_.stream_info().is_linked) {
state_ = State::kSendPing;
}
})} {
AE_TELE_INFO(kPing, "Ping action created, interval {:%S}", ping_interval);
Tie(read_client_safe_api_gate_, *server_stream_);
}
Expand All @@ -45,6 +51,8 @@ Ping::~Ping() = default;
TimePoint Ping::Update(TimePoint current_time) {
if (state_.changed()) {
switch (state_.Acquire()) {
case State::kWaitLink:
break;
case State::kSendPing:
SendPing(current_time);
break;
Expand Down Expand Up @@ -72,22 +80,39 @@ TimePoint Ping::Update(TimePoint current_time) {
void Ping::SendPing(TimePoint current_time) {
AE_TELE_DEBUG(kPingSend, "Send ping");
last_ping_time_ = current_time;
auto request_id = RequestId::GenRequestId();
auto packet = PacketBuilder{
protocol_context_, PackMessage{AuthorizedApi{}, AuthorizedApi::Ping{}}};
protocol_context_,
PackMessage{AuthorizedApi{},
AuthorizedApi::Ping{
{},
request_id,
static_cast<std::uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
ping_interval_)
.count())}}};
auto write_action =
server_stream_->in().Write(std::move(packet), current_time);

write_subscription_ = write_action->SubscribeOnError([this](auto const&) {
AE_TELE_ERROR(kPingWriteError, "Ping write error");
state_ = State::kError;
});
// TODO: add wait response
// state_ = State::kWaitResponse;
state_ = State::kWaitInterval;

// Wait for response
SendResult::OnResponse(protocol_context_, request_id,
[&](ApiParser&) { PingResponse(); });

state_ = State::kWaitResponse;
}

void Ping::PingResponse() {
// TODO: handle ping response
// calculate server and channel statistics
auto current_time = Now();
auto ping_duration =
std::chrono::duration_cast<Duration>(current_time - last_ping_time_);

AE_TELED_DEBUG("Ping received by {:%S} s", ping_duration);
state_ = State::kWaitInterval;
}

} // namespace ae
5 changes: 3 additions & 2 deletions aether/ae_actions/ping.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
namespace ae {
class Ping : public Action<Ping> {
enum class State : std::uint8_t {
kWaitLink,
kSendPing,
kWaitResponse,
kWaitInterval,
Expand Down Expand Up @@ -65,9 +66,9 @@ class Ping : public Action<Ping> {

TimePoint last_ping_time_;
Subscription write_subscription_;
Subscription ping_response_subscription_;
StateMachine<State> state_;
Subscription state_changed_subscription_;
Subscription state_changed_sub_;
Subscription stream_changed_sub_;
};
} // namespace ae

Expand Down
6 changes: 5 additions & 1 deletion aether/methods/work_server_api/authorized_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class AuthorizedApi : public ApiClass {
// Just ping the server to finalize authorization and stream
struct Ping : public Message<Ping> {
static constexpr auto kMessageCode = 6;
AE_REFLECT()

AE_REFLECT_MEMBERS(request_id, next_ping_duration)

RequestId request_id;
std::uint64_t next_ping_duration;
};

struct OpenStreamToClient : public Message<OpenStreamToClient> {
Expand Down
Loading