From 14becad9f9029b8554521ab78961f841182a53a9 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Fri, 21 Mar 2025 16:09:57 +0500 Subject: [PATCH 01/12] add notify action --- aether/actions/action.h | 65 ------------------------- aether/actions/notify_action.h | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 65 deletions(-) create mode 100644 aether/actions/notify_action.h diff --git a/aether/actions/action.h b/aether/actions/action.h index b58e95f5..c2450669 100644 --- a/aether/actions/action.h +++ b/aether/actions/action.h @@ -161,71 +161,6 @@ class Action : public IAction { ActionTrigger* action_trigger_{}; ActionRegistry::IndexShare index_; }; - -template -class NotifyAction : public Action> { - public: - using Action>::Action; - using Action>::operator=; - - TimePoint Update(TimePoint current_time) override { - if (notify_success_) { - notify_success_ = false; - this->ResultRepeat(static_cast(*this)); - } - if (notify_failed_) { - notify_failed_ = false; - this->Error(static_cast(*this)); - } - return current_time; - } - - void Notify() { - notify_success_ = true; - this->Trigger(); - } - void Failed() { - notify_failed_ = true; - this->Trigger(); - } - - private: - bool notify_success_{}; - bool notify_failed_{}; -}; - -template <> -class NotifyAction : public Action> { - public: - using Action>::Action; - using Action>::operator=; - - TimePoint Update(TimePoint current_time) override { - if (notify_success_) { - notify_success_ = false; - this->ResultRepeat(*this); - } - if (notify_failed_) { - notify_failed_ = false; - this->Error(*this); - } - return current_time; - } - - void Notify() { - notify_success_ = true; - this->Trigger(); - } - void Failed() { - notify_failed_ = true; - this->Trigger(); - } - - private: - bool notify_success_{}; - bool notify_failed_{}; -}; - } // namespace ae #endif // AETHER_ACTIONS_ACTION_H_ diff --git a/aether/actions/notify_action.h b/aether/actions/notify_action.h new file mode 100644 index 00000000..42b779bb --- /dev/null +++ b/aether/actions/notify_action.h @@ -0,0 +1,88 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AETHER_ACTIONS_NOTIFY_ACTION_H_ +#define AETHER_ACTIONS_NOTIFY_ACTION_H_ + +#include "aether/actions/action.h" + +namespace ae { +template +class NotifyAction : public Action> { + public: + using Action>::Action; + using Action>::operator=; + + TimePoint Update(TimePoint current_time) override { + if (notify_success_) { + notify_success_ = false; + this->ResultRepeat(static_cast(*this)); + } + if (notify_failed_) { + notify_failed_ = false; + this->Error(static_cast(*this)); + } + return current_time; + } + + void Notify() { + notify_success_ = true; + this->Trigger(); + } + void Failed() { + notify_failed_ = true; + this->Trigger(); + } + + private: + bool notify_success_{}; + bool notify_failed_{}; +}; + +template <> +class NotifyAction : public Action> { + public: + using Action>::Action; + using Action>::operator=; + + TimePoint Update(TimePoint current_time) override { + if (notify_success_) { + notify_success_ = false; + this->ResultRepeat(*this); + } + if (notify_failed_) { + notify_failed_ = false; + this->Error(*this); + } + return current_time; + } + + void Notify() { + notify_success_ = true; + this->Trigger(); + } + void Failed() { + notify_failed_ = true; + this->Trigger(); + } + + private: + bool notify_success_{}; + bool notify_failed_{}; +}; +} // namespace ae + +#endif // AETHER_ACTIONS_NOTIFY_ACTION_H_ From 492916bd9ddc6d999c06584c6bd187a5731cff33 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Fri, 21 Mar 2025 16:10:32 +0500 Subject: [PATCH 02/12] add include notify action --- aether/client_connections/client_cloud_connection.h | 1 + aether/transport/low_level/tcp/lwip_tcp.h | 1 + aether/transport/low_level/tcp/unix_tcp.h | 1 + aether/transport/low_level/tcp/win_tcp.h | 1 + 4 files changed, 4 insertions(+) diff --git a/aether/client_connections/client_cloud_connection.h b/aether/client_connections/client_cloud_connection.h index 3a96e4c6..2ffdcb18 100644 --- a/aether/client_connections/client_cloud_connection.h +++ b/aether/client_connections/client_cloud_connection.h @@ -24,6 +24,7 @@ #include "aether/memory.h" #include "aether/async_for_loop.h" #include "aether/actions/action.h" +#include "aether/actions/notify_action.h" #include "aether/events/event_subscription.h" #include "aether/events/multi_subscription.h" diff --git a/aether/transport/low_level/tcp/lwip_tcp.h b/aether/transport/low_level/tcp/lwip_tcp.h index ea2b9edb..a35b189f 100644 --- a/aether/transport/low_level/tcp/lwip_tcp.h +++ b/aether/transport/low_level/tcp/lwip_tcp.h @@ -30,6 +30,7 @@ # include "aether/events/events.h" # include "aether/events/multi_subscription.h" +# include "aether/actions/notify_action.h" # include "aether/actions/action_context.h" # include "aether/transport/itransport.h" diff --git a/aether/transport/low_level/tcp/unix_tcp.h b/aether/transport/low_level/tcp/unix_tcp.h index 62563aed..49f3e3d1 100644 --- a/aether/transport/low_level/tcp/unix_tcp.h +++ b/aether/transport/low_level/tcp/unix_tcp.h @@ -28,6 +28,7 @@ # include "aether/common.h" # include "aether/poller/poller.h" # include "aether/actions/action.h" +# include "aether/actions/notify_action.h" # include "aether/actions/action_context.h" # include "aether/events/multi_subscription.h" diff --git a/aether/transport/low_level/tcp/win_tcp.h b/aether/transport/low_level/tcp/win_tcp.h index 5036cce2..973c51a5 100644 --- a/aether/transport/low_level/tcp/win_tcp.h +++ b/aether/transport/low_level/tcp/win_tcp.h @@ -30,6 +30,7 @@ # include "aether/events/multi_subscription.h" # include "aether/poller/poller.h" +# include "aether/actions/notify_action.h" # include "aether/actions/action_context.h" # include "aether/poller/win_poller.h" // for WinPollerOverlapped From 13a4c01ddfbb62ec94e9be4cefa5e677fdab96db Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Fri, 21 Mar 2025 16:10:41 +0500 Subject: [PATCH 03/12] add timer action --- aether/CMakeLists.txt | 4 ++- aether/actions/timer_action.cpp | 48 +++++++++++++++++++++++++ aether/actions/timer_action.h | 62 +++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 aether/actions/timer_action.cpp create mode 100644 aether/actions/timer_action.h diff --git a/aether/CMakeLists.txt b/aether/CMakeLists.txt index 97649f10..5ca5fc71 100644 --- a/aether/CMakeLists.txt +++ b/aether/CMakeLists.txt @@ -100,7 +100,9 @@ list(APPEND actions_srcs "actions/action_trigger.cpp" "actions/action_registry.cpp" "actions/action_context.cpp" - "actions/action_processor.cpp") + "actions/action_processor.cpp" + "actions/timer_action.cpp" + ) list(APPEND adapters_srcs "adapters/adapter_factory.cpp" diff --git a/aether/actions/timer_action.cpp b/aether/actions/timer_action.cpp new file mode 100644 index 00000000..a9abd347 --- /dev/null +++ b/aether/actions/timer_action.cpp @@ -0,0 +1,48 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "aether/actions/timer_action.h" + +namespace ae { +TimePoint TimerAction::Update(TimePoint current_time) { + if (state_.get() == State::kWait) { + if ((start_time_ + timer_duration_) > current_time) { + return start_time_ + timer_duration_; + } + state_ = State::kTriggered; + } + if (state_.changed()) { + switch (state_.Acquire()) { + case State::kStart: + start_time_ = current_time; + state_ = State::kWait; + break; + case State::kWait: + break; + case State::kTriggered: + Action::Result(*this); + return current_time; + case State::kStopped: + Action::Stop(*this); + return current_time; + } + } + return current_time; +} + +void Stop(); + +} // namespace ae diff --git a/aether/actions/timer_action.h b/aether/actions/timer_action.h new file mode 100644 index 00000000..2f70db85 --- /dev/null +++ b/aether/actions/timer_action.h @@ -0,0 +1,62 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AETHER_ACTIONS_TIMER_ACTION_H_ +#define AETHER_ACTIONS_TIMER_ACTION_H_ + +#include + +#include "aether/common.h" +#include "aether/state_machine.h" +#include "aether/actions/action.h" +#include "aether/events/event_subscription.h" + +namespace ae { +class TimerAction : public Action { + enum class State : std::uint8_t { + kStart, + kWait, + kTriggered, + kStopped, + }; + + public: + TimerAction() = default; + + template + TimerAction(TActionContext&& action_context, Duration duration) + : Action{std::forward(action_context)}, + timer_duration_{duration}, + state_{State::kStart}, + state_changed_sub_{state_.changed_event().Subscribe( + [this](auto) { Action::Trigger(); })} {} + + using Action::Action; + using Action::operator=; + + TimePoint Update(TimePoint current_time) override; + + void Stop(); + + private: + Duration timer_duration_; + TimePoint start_time_; + StateMachine state_; + Subscription state_changed_sub_; +}; +} // namespace ae + +#endif // AETHER_ACTIONS_TIMER_ACTION_H_ From 8bd743d60d53b53d0c6c74a2d1117dafb2bd075d Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Fri, 21 Mar 2025 16:11:10 +0500 Subject: [PATCH 04/12] add subscribe on transport disconnection --- .../transport/server/server_channel_stream.cpp | 18 +++++++++++------- .../transport/server/server_channel_stream.h | 5 ++++- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/aether/transport/server/server_channel_stream.cpp b/aether/transport/server/server_channel_stream.cpp index 8bfec737..7739b6b2 100644 --- a/aether/transport/server/server_channel_stream.cpp +++ b/aether/transport/server/server_channel_stream.cpp @@ -61,17 +61,21 @@ ServerChannelStream::ServerChannelStream(ObjPtr const& aether, address); }, channel_->address)} { - connection_subscriptions_.Push( - connection_action_->SubscribeOnResult( - [this](auto& action) { OnConnected(action); }), - connection_action_->SubscribeOnError( - [this](auto const&) { OnConnectedFailed(); }), - connection_action_->FinishedEvent().Subscribe( - [this]() { connection_action_.reset(); })); + connection_success_ = connection_action_->SubscribeOnResult( + [this](auto& action) { OnConnected(action); }), + connection_failed_ = connection_action_->SubscribeOnError( + [this](auto const&) { OnConnectedFailed(); }), + connection_finished_ = connection_action_->FinishedEvent().Subscribe( + [this]() { connection_action_.reset(); }); } void ServerChannelStream::OnConnected(ChannelConnectionAction& connection) { transport_ = connection.transport(); + connection_error_ = + transport_->ConnectionError() + .Subscribe(*this, + MethodPtr<&ServerChannelStream::OnConnectedFailed>{}) + .Once(); transport_write_gate_.emplace(action_context_, *transport_); Tie(buffer_gate_, *transport_write_gate_); } diff --git a/aether/transport/server/server_channel_stream.h b/aether/transport/server/server_channel_stream.h index 4cf83bd4..307aa99f 100644 --- a/aether/transport/server/server_channel_stream.h +++ b/aether/transport/server/server_channel_stream.h @@ -64,7 +64,10 @@ class ServerChannelStream final : public ByteStream { std::optional transport_write_gate_; std::unique_ptr connection_action_; - MultiSubscription connection_subscriptions_; + Subscription connection_success_; + Subscription connection_failed_; + Subscription connection_finished_; + Subscription connection_error_; }; } // namespace ae From 13b08385adbad04384e66584660f7c7d0c59138e Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Fri, 21 Mar 2025 16:17:17 +0500 Subject: [PATCH 05/12] make reconnect to ther servers on loop --- .../client_cloud_connection.cpp | 86 ++++++++++++------- .../client_cloud_connection.h | 14 ++- aether/client_connections/client_connection.h | 5 -- 3 files changed, 64 insertions(+), 41 deletions(-) diff --git a/aether/client_connections/client_cloud_connection.cpp b/aether/client_connections/client_cloud_connection.cpp index 7b12e0f6..9453e054 100644 --- a/aether/client_connections/client_cloud_connection.cpp +++ b/aether/client_connections/client_cloud_connection.cpp @@ -32,15 +32,6 @@ ClientCloudConnection::ClientCloudConnection( Connect(); } -void ClientCloudConnection::Connect() { - connection_selector_loop_ = - AsyncForLoop>::Construct( - server_connection_selector_, - [this]() { return server_connection_selector_.GetConnection(); }); - - SelectConnection(); -} - std::unique_ptr ClientCloudConnection::CreateStream( Uid destination_uid, StreamId stream_id) { AE_TELE_DEBUG(CloudClientConnStreamCreate, @@ -87,30 +78,37 @@ void ClientCloudConnection::CloseStream(Uid uid, StreamId stream_id) { } } +void ClientCloudConnection::Connect() { + connection_selector_loop_ = + AsyncForLoop>::Construct( + server_connection_selector_, + [this]() { return server_connection_selector_.GetConnection(); }); + + SelectConnection(); +} + void ClientCloudConnection::SelectConnection() { if (server_connection_ = connection_selector_loop_->Update(); !server_connection_) { AE_TELED_ERROR("Server channel list is ended"); + ServerListEnded(); return; } - // TODO: add subscription to disconnection - - connection_status_subscription_ = - server_connection_->server_stream() - .in() - .gate_update_event() - .Subscribe([this]() { - if (server_connection_->server_stream() - .in() - .stream_info() - .is_linked) { - AE_TELED_INFO("Client cloud connection is connected"); - } else { - OnConnectionError(); - } - }) - .Once(); + connection_status_sub_ = server_connection_->server_stream() + .in() + .gate_update_event() + .Subscribe([this]() { + if (server_connection_->server_stream() + .in() + .stream_info() + .is_linked) { + OnConnected(); + } else { + OnConnectionError(); + } + }) + .Once(); // restore all known streams to a new server for (auto& [uid, gate] : gates_) { @@ -122,18 +120,42 @@ void ClientCloudConnection::SelectConnection() { *this, MethodPtr<&ClientCloudConnection::NewStream>{}); } +void ClientCloudConnection::OnConnected() { + AE_TELED_INFO("Client cloud connection is connected"); + // subscribe to disconnection + connection_status_sub_ = server_connection_->server_stream() + .in() + .gate_update_event() + .Subscribe([this]() { + if (!server_connection_->server_stream() + .in() + .stream_info() + .is_linked) { + OnConnectionError(); + } + }) + .Once(); +} + void ClientCloudConnection::OnConnectionError() { AE_TELED_ERROR("Connection error"); reconnect_notify_ = ReconnectNotify{action_context_}; - reconnect_notify_subscription_ = reconnect_notify_ - .SubscribeOnResult([this](auto const&) { - AE_TELED_DEBUG("Reconnect"); - SelectConnection(); - }) - .Once(); + reconnect_notify_sub_ = reconnect_notify_ + .SubscribeOnResult([this](auto const&) { + AE_TELED_DEBUG("Reconnect"); + SelectConnection(); + }) + .Once(); reconnect_notify_.Notify(); } +void ClientCloudConnection::ServerListEnded() { + next_server_loop_timer_ = + NextServerLoopTimer{action_context_, std::chrono::milliseconds{5000}}; + next_server_loop_subs_ = next_server_loop_timer_.SubscribeOnResult( + [&](auto const&) { Connect(); }); +} + void ClientCloudConnection::NewStream(Uid uid, ByteStream& stream) { AE_TELE_DEBUG(CloudClientNewStream, "New stream for uid {}", uid); diff --git a/aether/client_connections/client_cloud_connection.h b/aether/client_connections/client_cloud_connection.h index 2ffdcb18..4f51cf75 100644 --- a/aether/client_connections/client_cloud_connection.h +++ b/aether/client_connections/client_cloud_connection.h @@ -23,7 +23,7 @@ #include "aether/uid.h" #include "aether/memory.h" #include "aether/async_for_loop.h" -#include "aether/actions/action.h" +#include "aether/actions/timer_action.h" #include "aether/actions/notify_action.h" #include "aether/events/event_subscription.h" #include "aether/events/multi_subscription.h" @@ -43,13 +43,13 @@ class Cloud; */ class ClientCloudConnection final : public ClientConnection { using ReconnectNotify = NotifyAction<>; + using NextServerLoopTimer = TimerAction; public: ClientCloudConnection( ActionContext action_context, ObjPtr const& cloud, std::unique_ptr&& server_connection_factory); - void Connect() override; std::unique_ptr CreateStream(Uid destination_uid, StreamId stream_id) override; NewStreamEvent::Subscriber new_stream_event() override; @@ -58,8 +58,11 @@ class ClientCloudConnection final : public ClientConnection { AE_REFLECT() private: + void Connect(); void SelectConnection(); + void ServerListEnded(); + void OnConnected(); void OnConnectionError(); void NewStream(Uid uid, ByteStream& stream); @@ -75,13 +78,16 @@ class ClientCloudConnection final : public ClientConnection { Subscription new_stream_event_subscription_; MultiSubscription new_split_stream_subscription_; - Subscription connection_status_subscription_; + Subscription connection_status_sub_; // known streams to clients std::map> gates_; ReconnectNotify reconnect_notify_; - Subscription reconnect_notify_subscription_; + NextServerLoopTimer next_server_loop_timer_; + + Subscription reconnect_notify_sub_; + Subscription next_server_loop_subs_; }; } // namespace ae diff --git a/aether/client_connections/client_connection.h b/aether/client_connections/client_connection.h index 4bcc3da9..2cb6d1e3 100644 --- a/aether/client_connections/client_connection.h +++ b/aether/client_connections/client_connection.h @@ -35,11 +35,6 @@ class ClientConnection { virtual ~ClientConnection() = default; - /** - * \brief Reconfigure connectivity to the cloud. - */ - virtual void Connect() = 0; - /** * \brief Create a stream to another client to send messages. */ From 463bcc51eba2cd0f01ae49d5dec5ca4a2b2df4f3 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 17:37:23 +0500 Subject: [PATCH 06/12] fix moving timer action --- aether/actions/timer_action.cpp | 20 ++++++++++++++++++++ aether/actions/timer_action.h | 8 ++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/aether/actions/timer_action.cpp b/aether/actions/timer_action.cpp index a9abd347..37107419 100644 --- a/aether/actions/timer_action.cpp +++ b/aether/actions/timer_action.cpp @@ -17,6 +17,26 @@ #include "aether/actions/timer_action.h" namespace ae { +TimerAction::TimerAction(TimerAction&& other) noexcept + : Action{std::move(other)}, + timer_duration_{other.timer_duration_}, + start_time_{other.start_time_}, + state_{std::move(other.state_)}, + state_changed_sub_{state_.changed_event().Subscribe( + [this](auto) { Action::Trigger(); })} {} + +TimerAction& TimerAction::operator=(TimerAction&& other) noexcept { + if (this != &other) { + Action::operator=(std::move(other)); + timer_duration_ = other.timer_duration_; + start_time_ = other.start_time_; + state_ = std::move(other.state_); + state_changed_sub_ = + state_.changed_event().Subscribe([this](auto) { Action::Trigger(); }); + } + return *this; +} + TimePoint TimerAction::Update(TimePoint current_time) { if (state_.get() == State::kWait) { if ((start_time_ + timer_duration_) > current_time) { diff --git a/aether/actions/timer_action.h b/aether/actions/timer_action.h index 2f70db85..8708994f 100644 --- a/aether/actions/timer_action.h +++ b/aether/actions/timer_action.h @@ -18,6 +18,7 @@ #define AETHER_ACTIONS_TIMER_ACTION_H_ #include +#include #include "aether/common.h" #include "aether/state_machine.h" @@ -44,8 +45,11 @@ class TimerAction : public Action { state_changed_sub_{state_.changed_event().Subscribe( [this](auto) { Action::Trigger(); })} {} - using Action::Action; - using Action::operator=; + TimerAction(TimerAction const& other) = delete; + TimerAction(TimerAction&& other) noexcept; + + TimerAction& operator=(TimerAction const& other) = delete; + TimerAction& operator=(TimerAction&& other) noexcept; TimePoint Update(TimePoint current_time) override; From 00042999e3b8072d9a959c965780b880e82852e3 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 17:37:43 +0500 Subject: [PATCH 07/12] add propper unlink for gates --- aether/stream_api/buffer_gate.cpp | 10 ++++++++++ aether/stream_api/buffer_gate.h | 1 + aether/stream_api/istream.h | 2 ++ 3 files changed, 13 insertions(+) diff --git a/aether/stream_api/buffer_gate.cpp b/aether/stream_api/buffer_gate.cpp index 1d106e8a..363c4e33 100644 --- a/aether/stream_api/buffer_gate.cpp +++ b/aether/stream_api/buffer_gate.cpp @@ -136,6 +136,16 @@ void BufferGate::LinkOut(OutGate& out) { UpdateGate(); } +void BufferGate::Unlink() { + out_ = nullptr; + out_data_subscription_.Reset(); + gate_update_subscription_.Reset(); + + stream_info_ = {}; + stream_info_.is_soft_writable = write_in_buffer_.size() < buffer_max_; + gate_update_event_.Emit(); +} + StreamInfo BufferGate::stream_info() const { return stream_info_; } void BufferGate::SetSoftWriteable(bool value) { diff --git a/aether/stream_api/buffer_gate.h b/aether/stream_api/buffer_gate.h index 9afb2207..c684d559 100644 --- a/aether/stream_api/buffer_gate.h +++ b/aether/stream_api/buffer_gate.h @@ -69,6 +69,7 @@ class BufferGate final : public ByteGate { TimePoint current_time) override; void LinkOut(OutGate& out) override; + void Unlink() override; StreamInfo stream_info() const override; diff --git a/aether/stream_api/istream.h b/aether/stream_api/istream.h index cf9a6b71..7ad6f4b2 100644 --- a/aether/stream_api/istream.h +++ b/aether/stream_api/istream.h @@ -100,6 +100,8 @@ class Gate : public IGate { virtual void LinkOut(OutGate& out) = 0; virtual void Unlink() { out_ = nullptr; + out_data_subscription_.Reset(); + gate_update_subscription_.Reset(); gate_update_event_.Emit(); } From 6da756d639860e9446f3c4ae535570a4e1964d9b Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 17:38:43 +0500 Subject: [PATCH 08/12] change order for error event --- aether/poller/epoll_poller.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aether/poller/epoll_poller.cpp b/aether/poller/epoll_poller.cpp index be8b785c..f425b0c2 100644 --- a/aether/poller/epoll_poller.cpp +++ b/aether/poller/epoll_poller.cpp @@ -121,15 +121,15 @@ class EpollPoller::PollWorker { static std::vector FromEpollEvent(std::uint32_t events) { auto res = std::vector{}; res.reserve(3); - if ((events & (EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0) { - res.push_back(EventType::kError); - } if ((events & EPOLLIN) != 0) { res.push_back(EventType::kRead); } if ((events & EPOLLOUT) != 0) { res.push_back(EventType::kWrite); } + if ((events & (EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0) { + res.push_back(EventType::kError); + } return res; } From df21bb65d61d18023bac46556c73b6a7eaba10cf Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 17:38:58 +0500 Subject: [PATCH 09/12] add select new connection to the server --- .../client_cloud_connection.cpp | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/aether/client_connections/client_cloud_connection.cpp b/aether/client_connections/client_cloud_connection.cpp index 9453e054..099d9c91 100644 --- a/aether/client_connections/client_cloud_connection.cpp +++ b/aether/client_connections/client_cloud_connection.cpp @@ -95,20 +95,25 @@ void ClientCloudConnection::SelectConnection() { return; } - connection_status_sub_ = server_connection_->server_stream() - .in() - .gate_update_event() - .Subscribe([this]() { - if (server_connection_->server_stream() - .in() - .stream_info() - .is_linked) { - OnConnected(); - } else { - OnConnectionError(); - } - }) - .Once(); + if (server_connection_->server_stream().in().stream_info().is_linked) { + OnConnected(); + } else { + connection_status_sub_ = server_connection_->server_stream() + .in() + .gate_update_event() + .Subscribe([this]() { + if (server_connection_->server_stream() + .in() + .stream_info() + .is_linked) { + OnConnected(); + } else { + AE_TELED_ERROR("CE2"); + OnConnectionError(); + } + }) + .Once(); + } // restore all known streams to a new server for (auto& [uid, gate] : gates_) { @@ -131,6 +136,7 @@ void ClientCloudConnection::OnConnected() { .in() .stream_info() .is_linked) { + AE_TELED_ERROR("CE1"); OnConnectionError(); } }) @@ -152,8 +158,11 @@ void ClientCloudConnection::OnConnectionError() { void ClientCloudConnection::ServerListEnded() { next_server_loop_timer_ = NextServerLoopTimer{action_context_, std::chrono::milliseconds{5000}}; - next_server_loop_subs_ = next_server_loop_timer_.SubscribeOnResult( - [&](auto const&) { Connect(); }); + next_server_loop_subs_ = + next_server_loop_timer_.SubscribeOnResult([&](auto const&) { + AE_TELED_DEBUG("Connect again"); + Connect(); + }); } void ClientCloudConnection::NewStream(Uid uid, ByteStream& stream) { From fd0ade951747418b38a8e43276da5ddcb32f6424 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Tue, 25 Mar 2025 17:42:18 +0500 Subject: [PATCH 10/12] add connection faild on disconenct --- aether/transport/low_level/tcp/unix_tcp.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/aether/transport/low_level/tcp/unix_tcp.cpp b/aether/transport/low_level/tcp/unix_tcp.cpp index 1b67922b..df1f6f94 100644 --- a/aether/transport/low_level/tcp/unix_tcp.cpp +++ b/aether/transport/low_level/tcp/unix_tcp.cpp @@ -347,6 +347,12 @@ void UnixTcpTransport::UnixPacketReadAction::Read() { error_ = true; break; } + if (res == 0) { + // socket shutdown + AE_TELED_ERROR("Recv shutdown"); + error_ = true; + break; + } data_packet_collector_.AddData(read_buffer_.data(), static_cast(res)); read_event_ = true; @@ -455,6 +461,9 @@ void UnixTcpTransport::OnConnected(int socket) { if (event.descriptor != socket_) { return; } + + AE_TELED_DEBUG("Socket event {}", event.event_type); + switch (event.event_type) { case EventType::kRead: ReadSocket(); @@ -517,6 +526,8 @@ void UnixTcpTransport::Disconnect() { return; } socket_ = kInvalidSocket; + + OnConnectionFailed(); } } // namespace ae #endif From 9ae4de2cff0109b144d0712652f6fb6e1835758f Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Wed, 26 Mar 2025 15:35:44 +0500 Subject: [PATCH 11/12] add assert if cloud empty --- aether/ae_actions/registration/registration.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/aether/ae_actions/registration/registration.cpp b/aether/ae_actions/registration/registration.cpp index 0828e1d1..bda5f968 100644 --- a/aether/ae_actions/registration/registration.cpp +++ b/aether/ae_actions/registration/registration.cpp @@ -383,6 +383,7 @@ void Registration::OnConfirmRegistration( ephemeral_uid_ = message.registration_response.ephemeral_uid; uid_ = message.registration_response.uid; cloud_ = message.registration_response.cloud; + assert(cloud_.size() > 0); state_ = State::kRequestCloudResolving; } From 129cf76db42c0ce8d12c510f789e3cf511153064 Mon Sep 17 00:00:00 2001 From: BartolomeyKant Date: Thu, 27 Mar 2025 16:19:33 +0500 Subject: [PATCH 12/12] remove logs --- aether/client_connections/client_cloud_connection.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/aether/client_connections/client_cloud_connection.cpp b/aether/client_connections/client_cloud_connection.cpp index 099d9c91..19087344 100644 --- a/aether/client_connections/client_cloud_connection.cpp +++ b/aether/client_connections/client_cloud_connection.cpp @@ -108,7 +108,6 @@ void ClientCloudConnection::SelectConnection() { .is_linked) { OnConnected(); } else { - AE_TELED_ERROR("CE2"); OnConnectionError(); } }) @@ -136,7 +135,6 @@ void ClientCloudConnection::OnConnected() { .in() .stream_info() .is_linked) { - AE_TELED_ERROR("CE1"); OnConnectionError(); } })