Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 74 additions & 53 deletions ostool/src/board/serial_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};

use anyhow::Context as _;
use futures::{SinkExt, StreamExt};
use tokio::{
Expand All @@ -8,7 +13,9 @@ use tokio::{
use tokio_tungstenite::tungstenite::Message;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

use crate::board::terminal::ServerControlMessage;
use crate::board::terminal::{
ServerControlAction, ServerControlMessage, classify_server_control_message,
};

pub type BoxedAsyncRead = Box<dyn futures::AsyncRead + Send + Unpin>;
pub type BoxedAsyncWrite = Box<dyn futures::AsyncWrite + Send + Unpin>;
Expand All @@ -25,73 +32,87 @@ pub async fn connect_serial_stream(
.await
.with_context(|| format!("failed to connect serial websocket {}", ws_url))?;
let (mut ws_sink, mut ws_stream) = stream.split();
let locally_closed = Arc::new(AtomicBool::new(false));

let (runner_stream, bridge_stream) = tokio::io::duplex(64 * 1024);
let (runner_rx, runner_tx) = split(runner_stream);
let (mut bridge_rx, mut bridge_tx) = split(bridge_stream);

let read_task = tokio::spawn(async move {
while let Some(message) = ws_stream.next().await {
match message.context("serial websocket read failed")? {
Message::Binary(bytes) => {
tokio::io::AsyncWriteExt::write_all(&mut bridge_tx, &bytes)
.await
.context("failed to write serial websocket bytes")?;
tokio::io::AsyncWriteExt::flush(&mut bridge_tx)
.await
.context("failed to flush serial websocket bytes")?;
}
Message::Text(text) => {
if let Ok(control) = serde_json::from_str::<ServerControlMessage>(&text) {
match control.kind.as_str() {
"opened" | "closed" => continue,
"error" => {
let message = control
.message
.unwrap_or_else(|| "serial websocket error".to_string());
anyhow::bail!("ostool-server serial websocket error: {message}");
let read_task = tokio::spawn({
let locally_closed = locally_closed.clone();
async move {
while let Some(message) = ws_stream.next().await {
match message.context("serial websocket read failed")? {
Message::Binary(bytes) => {
tokio::io::AsyncWriteExt::write_all(&mut bridge_tx, &bytes)
.await
.context("failed to write serial websocket bytes")?;
tokio::io::AsyncWriteExt::flush(&mut bridge_tx)
.await
.context("failed to flush serial websocket bytes")?;
}
Message::Text(text) => {
if let Ok(control) = serde_json::from_str::<ServerControlMessage>(&text) {
match classify_server_control_message(
&control,
locally_closed.load(Ordering::SeqCst),
) {
ServerControlAction::Ignore => continue,
ServerControlAction::Close => break,
ServerControlAction::Error(err) => return Err(err),
ServerControlAction::Forward => {}
}
_ => {}
}
}

tokio::io::AsyncWriteExt::write_all(&mut bridge_tx, text.as_bytes())
.await
.context("failed to write text serial websocket payload")?;
tokio::io::AsyncWriteExt::flush(&mut bridge_tx)
.await
.context("failed to flush text serial websocket payload")?;
tokio::io::AsyncWriteExt::write_all(&mut bridge_tx, text.as_bytes())
.await
.context("failed to write text serial websocket payload")?;
tokio::io::AsyncWriteExt::flush(&mut bridge_tx)
.await
.context("failed to flush text serial websocket payload")?;
}
Message::Close(_) => {
if locally_closed.load(Ordering::SeqCst) {
break;
}
anyhow::bail!(
"ostool-server closed the serial websocket; the board session may have been released"
);
}
Message::Ping(_) => {}
Message::Pong(_) | Message::Frame(_) => {}
}
Message::Close(_) => break,
Message::Ping(_) => {}
Message::Pong(_) | Message::Frame(_) => {}
}
}

Ok(())
Ok(())
}
});

let write_task = tokio::spawn(async move {
let mut buffer = [0u8; 4096];
loop {
let read = bridge_rx
.read(&mut buffer)
.await
.context("failed to read runner serial bytes")?;
if read == 0 {
break;
let write_task = tokio::spawn({
let locally_closed = locally_closed.clone();
async move {
let mut buffer = [0u8; 4096];
loop {
let read = bridge_rx
.read(&mut buffer)
.await
.context("failed to read runner serial bytes")?;
if read == 0 {
break;
}
ws_sink
.send(Message::Binary(buffer[..read].to_vec().into()))
.await
.context("serial websocket write failed")?;
}
ws_sink
.send(Message::Binary(buffer[..read].to_vec().into()))
.await
.context("serial websocket write failed")?;
}

let _ = ws_sink
.send(Message::Text(r#"{"type":"close"}"#.to_string().into()))
.await;
let _ = ws_sink.send(Message::Close(None)).await;
Ok(())
locally_closed.store(true, Ordering::SeqCst);
let _ = ws_sink
.send(Message::Text(r#"{"type":"close"}"#.to_string().into()))
.await;
let _ = ws_sink.send(Message::Close(None)).await;
Ok(())
}
});

Ok((
Expand Down
Loading