diff --git a/aether/CMakeLists.txt b/aether/CMakeLists.txt index 97649f10..5ca5fc71 100644 --- a/aether/CMakeLists.txt +++ b/aether/CMakeLists.txt @@ -100,7 +100,9 @@ list(APPEND actions_srcs "actions/action_trigger.cpp" "actions/action_registry.cpp" "actions/action_context.cpp" - "actions/action_processor.cpp") + "actions/action_processor.cpp" + "actions/timer_action.cpp" + ) list(APPEND adapters_srcs "adapters/adapter_factory.cpp" diff --git a/aether/actions/action.h b/aether/actions/action.h index b58e95f5..c2450669 100644 --- a/aether/actions/action.h +++ b/aether/actions/action.h @@ -161,71 +161,6 @@ class Action : public IAction { ActionTrigger* action_trigger_{}; ActionRegistry::IndexShare index_; }; - -template -class NotifyAction : public Action> { - public: - using Action>::Action; - using Action>::operator=; - - TimePoint Update(TimePoint current_time) override { - if (notify_success_) { - notify_success_ = false; - this->ResultRepeat(static_cast(*this)); - } - if (notify_failed_) { - notify_failed_ = false; - this->Error(static_cast(*this)); - } - return current_time; - } - - void Notify() { - notify_success_ = true; - this->Trigger(); - } - void Failed() { - notify_failed_ = true; - this->Trigger(); - } - - private: - bool notify_success_{}; - bool notify_failed_{}; -}; - -template <> -class NotifyAction : public Action> { - public: - using Action>::Action; - using Action>::operator=; - - TimePoint Update(TimePoint current_time) override { - if (notify_success_) { - notify_success_ = false; - this->ResultRepeat(*this); - } - if (notify_failed_) { - notify_failed_ = false; - this->Error(*this); - } - return current_time; - } - - void Notify() { - notify_success_ = true; - this->Trigger(); - } - void Failed() { - notify_failed_ = true; - this->Trigger(); - } - - private: - bool notify_success_{}; - bool notify_failed_{}; -}; - } // namespace ae #endif // AETHER_ACTIONS_ACTION_H_ diff --git a/aether/actions/notify_action.h b/aether/actions/notify_action.h new file mode 100644 index 00000000..42b779bb --- /dev/null +++ b/aether/actions/notify_action.h @@ -0,0 +1,88 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AETHER_ACTIONS_NOTIFY_ACTION_H_ +#define AETHER_ACTIONS_NOTIFY_ACTION_H_ + +#include "aether/actions/action.h" + +namespace ae { +template +class NotifyAction : public Action> { + public: + using Action>::Action; + using Action>::operator=; + + TimePoint Update(TimePoint current_time) override { + if (notify_success_) { + notify_success_ = false; + this->ResultRepeat(static_cast(*this)); + } + if (notify_failed_) { + notify_failed_ = false; + this->Error(static_cast(*this)); + } + return current_time; + } + + void Notify() { + notify_success_ = true; + this->Trigger(); + } + void Failed() { + notify_failed_ = true; + this->Trigger(); + } + + private: + bool notify_success_{}; + bool notify_failed_{}; +}; + +template <> +class NotifyAction : public Action> { + public: + using Action>::Action; + using Action>::operator=; + + TimePoint Update(TimePoint current_time) override { + if (notify_success_) { + notify_success_ = false; + this->ResultRepeat(*this); + } + if (notify_failed_) { + notify_failed_ = false; + this->Error(*this); + } + return current_time; + } + + void Notify() { + notify_success_ = true; + this->Trigger(); + } + void Failed() { + notify_failed_ = true; + this->Trigger(); + } + + private: + bool notify_success_{}; + bool notify_failed_{}; +}; +} // namespace ae + +#endif // AETHER_ACTIONS_NOTIFY_ACTION_H_ diff --git a/aether/actions/timer_action.cpp b/aether/actions/timer_action.cpp new file mode 100644 index 00000000..37107419 --- /dev/null +++ b/aether/actions/timer_action.cpp @@ -0,0 +1,68 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "aether/actions/timer_action.h" + +namespace ae { +TimerAction::TimerAction(TimerAction&& other) noexcept + : Action{std::move(other)}, + timer_duration_{other.timer_duration_}, + start_time_{other.start_time_}, + state_{std::move(other.state_)}, + state_changed_sub_{state_.changed_event().Subscribe( + [this](auto) { Action::Trigger(); })} {} + +TimerAction& TimerAction::operator=(TimerAction&& other) noexcept { + if (this != &other) { + Action::operator=(std::move(other)); + timer_duration_ = other.timer_duration_; + start_time_ = other.start_time_; + state_ = std::move(other.state_); + state_changed_sub_ = + state_.changed_event().Subscribe([this](auto) { Action::Trigger(); }); + } + return *this; +} + +TimePoint TimerAction::Update(TimePoint current_time) { + if (state_.get() == State::kWait) { + if ((start_time_ + timer_duration_) > current_time) { + return start_time_ + timer_duration_; + } + state_ = State::kTriggered; + } + if (state_.changed()) { + switch (state_.Acquire()) { + case State::kStart: + start_time_ = current_time; + state_ = State::kWait; + break; + case State::kWait: + break; + case State::kTriggered: + Action::Result(*this); + return current_time; + case State::kStopped: + Action::Stop(*this); + return current_time; + } + } + return current_time; +} + +void Stop(); + +} // namespace ae diff --git a/aether/actions/timer_action.h b/aether/actions/timer_action.h new file mode 100644 index 00000000..8708994f --- /dev/null +++ b/aether/actions/timer_action.h @@ -0,0 +1,66 @@ +/* + * Copyright 2025 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AETHER_ACTIONS_TIMER_ACTION_H_ +#define AETHER_ACTIONS_TIMER_ACTION_H_ + +#include +#include + +#include "aether/common.h" +#include "aether/state_machine.h" +#include "aether/actions/action.h" +#include "aether/events/event_subscription.h" + +namespace ae { +class TimerAction : public Action { + enum class State : std::uint8_t { + kStart, + kWait, + kTriggered, + kStopped, + }; + + public: + TimerAction() = default; + + template + TimerAction(TActionContext&& action_context, Duration duration) + : Action{std::forward(action_context)}, + timer_duration_{duration}, + state_{State::kStart}, + state_changed_sub_{state_.changed_event().Subscribe( + [this](auto) { Action::Trigger(); })} {} + + TimerAction(TimerAction const& other) = delete; + TimerAction(TimerAction&& other) noexcept; + + TimerAction& operator=(TimerAction const& other) = delete; + TimerAction& operator=(TimerAction&& other) noexcept; + + TimePoint Update(TimePoint current_time) override; + + void Stop(); + + private: + Duration timer_duration_; + TimePoint start_time_; + StateMachine state_; + Subscription state_changed_sub_; +}; +} // namespace ae + +#endif // AETHER_ACTIONS_TIMER_ACTION_H_ diff --git a/aether/ae_actions/registration/registration.cpp b/aether/ae_actions/registration/registration.cpp index 0828e1d1..bda5f968 100644 --- a/aether/ae_actions/registration/registration.cpp +++ b/aether/ae_actions/registration/registration.cpp @@ -383,6 +383,7 @@ void Registration::OnConfirmRegistration( ephemeral_uid_ = message.registration_response.ephemeral_uid; uid_ = message.registration_response.uid; cloud_ = message.registration_response.cloud; + assert(cloud_.size() > 0); state_ = State::kRequestCloudResolving; } diff --git a/aether/client_connections/client_cloud_connection.cpp b/aether/client_connections/client_cloud_connection.cpp index 7b12e0f6..19087344 100644 --- a/aether/client_connections/client_cloud_connection.cpp +++ b/aether/client_connections/client_cloud_connection.cpp @@ -32,15 +32,6 @@ ClientCloudConnection::ClientCloudConnection( Connect(); } -void ClientCloudConnection::Connect() { - connection_selector_loop_ = - AsyncForLoop>::Construct( - server_connection_selector_, - [this]() { return server_connection_selector_.GetConnection(); }); - - SelectConnection(); -} - std::unique_ptr ClientCloudConnection::CreateStream( Uid destination_uid, StreamId stream_id) { AE_TELE_DEBUG(CloudClientConnStreamCreate, @@ -87,30 +78,41 @@ void ClientCloudConnection::CloseStream(Uid uid, StreamId stream_id) { } } +void ClientCloudConnection::Connect() { + connection_selector_loop_ = + AsyncForLoop>::Construct( + server_connection_selector_, + [this]() { return server_connection_selector_.GetConnection(); }); + + SelectConnection(); +} + void ClientCloudConnection::SelectConnection() { if (server_connection_ = connection_selector_loop_->Update(); !server_connection_) { AE_TELED_ERROR("Server channel list is ended"); + ServerListEnded(); return; } - // TODO: add subscription to disconnection - - connection_status_subscription_ = - server_connection_->server_stream() - .in() - .gate_update_event() - .Subscribe([this]() { - if (server_connection_->server_stream() - .in() - .stream_info() - .is_linked) { - AE_TELED_INFO("Client cloud connection is connected"); - } else { - OnConnectionError(); - } - }) - .Once(); + if (server_connection_->server_stream().in().stream_info().is_linked) { + OnConnected(); + } else { + connection_status_sub_ = server_connection_->server_stream() + .in() + .gate_update_event() + .Subscribe([this]() { + if (server_connection_->server_stream() + .in() + .stream_info() + .is_linked) { + OnConnected(); + } else { + OnConnectionError(); + } + }) + .Once(); + } // restore all known streams to a new server for (auto& [uid, gate] : gates_) { @@ -122,18 +124,45 @@ void ClientCloudConnection::SelectConnection() { *this, MethodPtr<&ClientCloudConnection::NewStream>{}); } +void ClientCloudConnection::OnConnected() { + AE_TELED_INFO("Client cloud connection is connected"); + // subscribe to disconnection + connection_status_sub_ = server_connection_->server_stream() + .in() + .gate_update_event() + .Subscribe([this]() { + if (!server_connection_->server_stream() + .in() + .stream_info() + .is_linked) { + OnConnectionError(); + } + }) + .Once(); +} + void ClientCloudConnection::OnConnectionError() { AE_TELED_ERROR("Connection error"); reconnect_notify_ = ReconnectNotify{action_context_}; - reconnect_notify_subscription_ = reconnect_notify_ - .SubscribeOnResult([this](auto const&) { - AE_TELED_DEBUG("Reconnect"); - SelectConnection(); - }) - .Once(); + reconnect_notify_sub_ = reconnect_notify_ + .SubscribeOnResult([this](auto const&) { + AE_TELED_DEBUG("Reconnect"); + SelectConnection(); + }) + .Once(); reconnect_notify_.Notify(); } +void ClientCloudConnection::ServerListEnded() { + next_server_loop_timer_ = + NextServerLoopTimer{action_context_, std::chrono::milliseconds{5000}}; + next_server_loop_subs_ = + next_server_loop_timer_.SubscribeOnResult([&](auto const&) { + AE_TELED_DEBUG("Connect again"); + Connect(); + }); +} + void ClientCloudConnection::NewStream(Uid uid, ByteStream& stream) { AE_TELE_DEBUG(CloudClientNewStream, "New stream for uid {}", uid); diff --git a/aether/client_connections/client_cloud_connection.h b/aether/client_connections/client_cloud_connection.h index 3a96e4c6..4f51cf75 100644 --- a/aether/client_connections/client_cloud_connection.h +++ b/aether/client_connections/client_cloud_connection.h @@ -23,7 +23,8 @@ #include "aether/uid.h" #include "aether/memory.h" #include "aether/async_for_loop.h" -#include "aether/actions/action.h" +#include "aether/actions/timer_action.h" +#include "aether/actions/notify_action.h" #include "aether/events/event_subscription.h" #include "aether/events/multi_subscription.h" @@ -42,13 +43,13 @@ class Cloud; */ class ClientCloudConnection final : public ClientConnection { using ReconnectNotify = NotifyAction<>; + using NextServerLoopTimer = TimerAction; public: ClientCloudConnection( ActionContext action_context, ObjPtr const& cloud, std::unique_ptr&& server_connection_factory); - void Connect() override; std::unique_ptr CreateStream(Uid destination_uid, StreamId stream_id) override; NewStreamEvent::Subscriber new_stream_event() override; @@ -57,8 +58,11 @@ class ClientCloudConnection final : public ClientConnection { AE_REFLECT() private: + void Connect(); void SelectConnection(); + void ServerListEnded(); + void OnConnected(); void OnConnectionError(); void NewStream(Uid uid, ByteStream& stream); @@ -74,13 +78,16 @@ class ClientCloudConnection final : public ClientConnection { Subscription new_stream_event_subscription_; MultiSubscription new_split_stream_subscription_; - Subscription connection_status_subscription_; + Subscription connection_status_sub_; // known streams to clients std::map> gates_; ReconnectNotify reconnect_notify_; - Subscription reconnect_notify_subscription_; + NextServerLoopTimer next_server_loop_timer_; + + Subscription reconnect_notify_sub_; + Subscription next_server_loop_subs_; }; } // namespace ae diff --git a/aether/client_connections/client_connection.h b/aether/client_connections/client_connection.h index 4bcc3da9..2cb6d1e3 100644 --- a/aether/client_connections/client_connection.h +++ b/aether/client_connections/client_connection.h @@ -35,11 +35,6 @@ class ClientConnection { virtual ~ClientConnection() = default; - /** - * \brief Reconfigure connectivity to the cloud. - */ - virtual void Connect() = 0; - /** * \brief Create a stream to another client to send messages. */ diff --git a/aether/poller/epoll_poller.cpp b/aether/poller/epoll_poller.cpp index be8b785c..f425b0c2 100644 --- a/aether/poller/epoll_poller.cpp +++ b/aether/poller/epoll_poller.cpp @@ -121,15 +121,15 @@ class EpollPoller::PollWorker { static std::vector FromEpollEvent(std::uint32_t events) { auto res = std::vector{}; res.reserve(3); - if ((events & (EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0) { - res.push_back(EventType::kError); - } if ((events & EPOLLIN) != 0) { res.push_back(EventType::kRead); } if ((events & EPOLLOUT) != 0) { res.push_back(EventType::kWrite); } + if ((events & (EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0) { + res.push_back(EventType::kError); + } return res; } diff --git a/aether/stream_api/buffer_gate.cpp b/aether/stream_api/buffer_gate.cpp index 1d106e8a..363c4e33 100644 --- a/aether/stream_api/buffer_gate.cpp +++ b/aether/stream_api/buffer_gate.cpp @@ -136,6 +136,16 @@ void BufferGate::LinkOut(OutGate& out) { UpdateGate(); } +void BufferGate::Unlink() { + out_ = nullptr; + out_data_subscription_.Reset(); + gate_update_subscription_.Reset(); + + stream_info_ = {}; + stream_info_.is_soft_writable = write_in_buffer_.size() < buffer_max_; + gate_update_event_.Emit(); +} + StreamInfo BufferGate::stream_info() const { return stream_info_; } void BufferGate::SetSoftWriteable(bool value) { diff --git a/aether/stream_api/buffer_gate.h b/aether/stream_api/buffer_gate.h index 9afb2207..c684d559 100644 --- a/aether/stream_api/buffer_gate.h +++ b/aether/stream_api/buffer_gate.h @@ -69,6 +69,7 @@ class BufferGate final : public ByteGate { TimePoint current_time) override; void LinkOut(OutGate& out) override; + void Unlink() override; StreamInfo stream_info() const override; diff --git a/aether/stream_api/istream.h b/aether/stream_api/istream.h index cf9a6b71..7ad6f4b2 100644 --- a/aether/stream_api/istream.h +++ b/aether/stream_api/istream.h @@ -100,6 +100,8 @@ class Gate : public IGate { virtual void LinkOut(OutGate& out) = 0; virtual void Unlink() { out_ = nullptr; + out_data_subscription_.Reset(); + gate_update_subscription_.Reset(); gate_update_event_.Emit(); } diff --git a/aether/transport/low_level/tcp/lwip_tcp.h b/aether/transport/low_level/tcp/lwip_tcp.h index ea2b9edb..a35b189f 100644 --- a/aether/transport/low_level/tcp/lwip_tcp.h +++ b/aether/transport/low_level/tcp/lwip_tcp.h @@ -30,6 +30,7 @@ # include "aether/events/events.h" # include "aether/events/multi_subscription.h" +# include "aether/actions/notify_action.h" # include "aether/actions/action_context.h" # include "aether/transport/itransport.h" diff --git a/aether/transport/low_level/tcp/unix_tcp.cpp b/aether/transport/low_level/tcp/unix_tcp.cpp index 1b67922b..df1f6f94 100644 --- a/aether/transport/low_level/tcp/unix_tcp.cpp +++ b/aether/transport/low_level/tcp/unix_tcp.cpp @@ -347,6 +347,12 @@ void UnixTcpTransport::UnixPacketReadAction::Read() { error_ = true; break; } + if (res == 0) { + // socket shutdown + AE_TELED_ERROR("Recv shutdown"); + error_ = true; + break; + } data_packet_collector_.AddData(read_buffer_.data(), static_cast(res)); read_event_ = true; @@ -455,6 +461,9 @@ void UnixTcpTransport::OnConnected(int socket) { if (event.descriptor != socket_) { return; } + + AE_TELED_DEBUG("Socket event {}", event.event_type); + switch (event.event_type) { case EventType::kRead: ReadSocket(); @@ -517,6 +526,8 @@ void UnixTcpTransport::Disconnect() { return; } socket_ = kInvalidSocket; + + OnConnectionFailed(); } } // namespace ae #endif diff --git a/aether/transport/low_level/tcp/unix_tcp.h b/aether/transport/low_level/tcp/unix_tcp.h index 62563aed..49f3e3d1 100644 --- a/aether/transport/low_level/tcp/unix_tcp.h +++ b/aether/transport/low_level/tcp/unix_tcp.h @@ -28,6 +28,7 @@ # include "aether/common.h" # include "aether/poller/poller.h" # include "aether/actions/action.h" +# include "aether/actions/notify_action.h" # include "aether/actions/action_context.h" # include "aether/events/multi_subscription.h" diff --git a/aether/transport/low_level/tcp/win_tcp.h b/aether/transport/low_level/tcp/win_tcp.h index 5036cce2..973c51a5 100644 --- a/aether/transport/low_level/tcp/win_tcp.h +++ b/aether/transport/low_level/tcp/win_tcp.h @@ -30,6 +30,7 @@ # include "aether/events/multi_subscription.h" # include "aether/poller/poller.h" +# include "aether/actions/notify_action.h" # include "aether/actions/action_context.h" # include "aether/poller/win_poller.h" // for WinPollerOverlapped diff --git a/aether/transport/server/server_channel_stream.cpp b/aether/transport/server/server_channel_stream.cpp index 8bfec737..7739b6b2 100644 --- a/aether/transport/server/server_channel_stream.cpp +++ b/aether/transport/server/server_channel_stream.cpp @@ -61,17 +61,21 @@ ServerChannelStream::ServerChannelStream(ObjPtr const& aether, address); }, channel_->address)} { - connection_subscriptions_.Push( - connection_action_->SubscribeOnResult( - [this](auto& action) { OnConnected(action); }), - connection_action_->SubscribeOnError( - [this](auto const&) { OnConnectedFailed(); }), - connection_action_->FinishedEvent().Subscribe( - [this]() { connection_action_.reset(); })); + connection_success_ = connection_action_->SubscribeOnResult( + [this](auto& action) { OnConnected(action); }), + connection_failed_ = connection_action_->SubscribeOnError( + [this](auto const&) { OnConnectedFailed(); }), + connection_finished_ = connection_action_->FinishedEvent().Subscribe( + [this]() { connection_action_.reset(); }); } void ServerChannelStream::OnConnected(ChannelConnectionAction& connection) { transport_ = connection.transport(); + connection_error_ = + transport_->ConnectionError() + .Subscribe(*this, + MethodPtr<&ServerChannelStream::OnConnectedFailed>{}) + .Once(); transport_write_gate_.emplace(action_context_, *transport_); Tie(buffer_gate_, *transport_write_gate_); } diff --git a/aether/transport/server/server_channel_stream.h b/aether/transport/server/server_channel_stream.h index 4cf83bd4..307aa99f 100644 --- a/aether/transport/server/server_channel_stream.h +++ b/aether/transport/server/server_channel_stream.h @@ -64,7 +64,10 @@ class ServerChannelStream final : public ByteStream { std::optional transport_write_gate_; std::unique_ptr connection_action_; - MultiSubscription connection_subscriptions_; + Subscription connection_success_; + Subscription connection_failed_; + Subscription connection_finished_; + Subscription connection_error_; }; } // namespace ae