Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aether/actions/timer_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ TimePoint TimerAction::Update(TimePoint current_time) {
return current_time;
}

void Stop();
void TimerAction::Stop() { state_ = State::kStopped; }

} // namespace ae
33 changes: 28 additions & 5 deletions aether/transport/server/server_channel_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,49 @@ static constexpr auto kBufferGateCapacity = std::size_t{100};

ServerChannelStream::ServerChannelStream(ObjPtr<Aether> const& aether,
Adapter::ptr const& adapter,
Server::ptr server,
Channel::ptr channel)
Server::ptr const& server,
Channel::ptr const& channel)
: action_context_{*aether->action_processor},
server_{std::move(server)},
channel_{std::move(channel)},
server_{server},
channel_{channel},
buffer_gate_{action_context_, kBufferGateCapacity},
connection_action_{std::visit(
[&](auto const& address) {
return _internal::MakeConnectionAction(
ActionContext{*aether->action_processor}, aether, adapter,
address);
},
channel_->address)} {
channel->address)},
connection_start_time_{Now()},
connection_timer_{std::in_place, ActionContext{*aether->action_processor},
channel->expected_connection_time()} {
connection_success_ = connection_action_->SubscribeOnResult(
[this](auto& action) { OnConnected(action); }),
connection_failed_ = connection_action_->SubscribeOnError(
[this](auto const&) { OnConnectedFailed(); }),
connection_finished_ = connection_action_->FinishedEvent().Subscribe(
[this]() { connection_action_.reset(); });

connection_timeout_ = connection_timer_->SubscribeOnResult(
[this](auto const&) { OnConnectedFailed(); });
connection_timer_finished_ = connection_timer_->FinishedEvent().Subscribe(
[this]() { connection_timer_.reset(); });
}

void ServerChannelStream::OnConnected(ChannelConnectionAction& connection) {
auto channel_ptr = channel_.Lock();
assert(channel_ptr);

auto connection_time =
std::chrono::duration_cast<Duration>(Now() - connection_start_time_);
channel_ptr->AddConnectionTime(std::move(connection_time));

if (!connection_timer_) {
// probably timeout
return;
}
connection_timer_->Stop();

transport_ = connection.transport();
connection_error_ =
transport_->ConnectionError()
Expand All @@ -82,6 +103,8 @@ void ServerChannelStream::OnConnected(ChannelConnectionAction& connection) {

void ServerChannelStream::OnConnectedFailed() {
AE_TELED_ERROR("ServerChannelStream:OnConnectedFailed");
connection_timeout_.Reset();
connection_failed_.Reset();
buffer_gate_.Unlink();
}

Expand Down
12 changes: 9 additions & 3 deletions aether/transport/server/server_channel_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "aether/ptr/ptr.h"
#include "aether/obj/obj_ptr.h"
#include "aether/actions/timer_action.h"
#include "aether/actions/action_context.h"
#include "aether/events/multi_subscription.h"

Expand All @@ -43,7 +44,7 @@ class ChannelConnectionAction;
class ServerChannelStream final : public ByteStream {
public:
ServerChannelStream(ObjPtr<Aether> const& aether, Adapter::ptr const& adapter,
Server::ptr server, Channel::ptr channel);
Server::ptr const& server, Channel::ptr const& channel);

AE_CLASS_NO_COPY_MOVE(ServerChannelStream)

Expand All @@ -56,18 +57,23 @@ class ServerChannelStream final : public ByteStream {
void OnConnectedFailed();

ActionContext action_context_;
Server::ptr server_;
Channel::ptr channel_;
PtrView<Server> server_;
PtrView<Channel> channel_;

BufferGate buffer_gate_;
std::unique_ptr<ITransport> transport_;
std::optional<TransportWriteGate> transport_write_gate_;

std::unique_ptr<class ChannelConnectionAction> connection_action_;
TimePoint connection_start_time_;
std::optional<TimerAction> connection_timer_;

Subscription connection_success_;
Subscription connection_failed_;
Subscription connection_finished_;
Subscription connection_error_;
Subscription connection_timeout_;
Subscription connection_timer_finished_;
};
} // namespace ae

Expand Down