Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 178 additions & 1 deletion logicshell-core/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use std::path::PathBuf;
use std::process::Stdio;

use tokio::io::AsyncReadExt;
use tokio::io::{AsyncBufReadExt, AsyncReadExt};
use tokio::process::Command;
use tokio::sync::mpsc;

use crate::{config::LimitsConfig, LogicShellError, Result};

Expand Down Expand Up @@ -177,6 +178,108 @@ impl Dispatcher {
stdout_truncated,
})
}

/// Spawn a child process and stream its stdout line-by-line to `line_tx`.
///
/// Each line is sent as it arrives; stderr is captured in full and returned
/// in [`DispatchOutput`]. The stdout field of the returned output contains
/// all bytes that were sent through the channel.
pub async fn dispatch_streaming(
&self,
opts: DispatchOptions,
line_tx: mpsc::UnboundedSender<String>,
) -> Result<DispatchOutput> {
if opts.argv.is_empty() {
return Err(LogicShellError::Dispatch("argv must not be empty".into()));
}

let mut cmd = Command::new(&opts.argv[0]);
if opts.argv.len() > 1 {
cmd.args(&opts.argv[1..]);
}
for (k, v) in &opts.env_extra {
cmd.env(k, v);
}
if let Some(ref cwd) = opts.cwd {
cmd.current_dir(cwd);
}

let piped_stdin_data: Option<Vec<u8>> = match opts.stdin {
StdinMode::Null => {
cmd.stdin(Stdio::null());
None
}
StdinMode::Inherit => {
cmd.stdin(Stdio::inherit());
None
}
StdinMode::Piped(data) => {
cmd.stdin(Stdio::piped());
Some(data)
}
};

cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());

let mut child = cmd
.spawn()
.map_err(|e| LogicShellError::Dispatch(format!("spawn failed: {e}")))?;

let stdin_task = if let Some(data) = piped_stdin_data {
child.stdin.take().map(|mut stdin_handle| {
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let _ = stdin_handle.write_all(&data).await;
})
})
} else {
None
};

let stdout_handle = child.stdout.take().expect("stdout is piped");
let stderr_handle = child.stderr.take().expect("stderr is piped");

let stdout_fut = async move {
let reader = tokio::io::BufReader::new(stdout_handle);
let mut lines = reader.lines();
let mut all_bytes: Vec<u8> = Vec::new();
while let Ok(Some(line)) = lines.next_line().await {
all_bytes.extend_from_slice(line.as_bytes());
all_bytes.push(b'\n');
let _ = line_tx.send(line);
}
all_bytes
};

let stderr_fut = async move {
let mut buf = Vec::new();
let _ = tokio::io::BufReader::new(stderr_handle)
.read_to_end(&mut buf)
.await;
buf
};

let (stdout_bytes, stderr) = tokio::join!(stdout_fut, stderr_fut);

if let Some(task) = stdin_task {
let _ = task.await;
}

let status = child
.wait()
.await
.map_err(|e| LogicShellError::Dispatch(format!("wait failed: {e}")))?;

let exit_code = status.code().unwrap_or(-1);

Ok(DispatchOutput {
exit_code,
stdout: stdout_bytes,
stderr,
stdout_truncated: false,
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -253,4 +356,78 @@ mod tests {
.unwrap();
assert_eq!(out.exit_code, -1);
}

/// dispatch_streaming delivers lines to the channel in order.
#[tokio::test]
async fn streaming_delivers_lines_in_order() {
let d = default_dispatcher();
let (tx, mut rx) = mpsc::unbounded_channel();
let out = d
.dispatch_streaming(
DispatchOptions {
argv: vec![
"sh".into(),
"-c".into(),
"echo line1; echo line2; echo line3".into(),
],
..Default::default()
},
tx,
)
.await
.unwrap();
assert_eq!(out.exit_code, 0);
let mut lines = vec![];
while let Ok(l) = rx.try_recv() {
lines.push(l);
}
assert_eq!(lines, vec!["line1", "line2", "line3"]);
}

/// dispatch_streaming with empty argv returns a Dispatch error.
#[tokio::test]
async fn streaming_empty_argv_returns_error() {
let d = default_dispatcher();
let (tx, _rx) = mpsc::unbounded_channel();
let result = d.dispatch_streaming(DispatchOptions::default(), tx).await;
assert!(matches!(result, Err(LogicShellError::Dispatch(_))));
}

/// dispatch_streaming captures exit code correctly.
#[tokio::test]
async fn streaming_exit_code_propagated() {
let d = default_dispatcher();
let (tx, _rx) = mpsc::unbounded_channel();
let out = d
.dispatch_streaming(
DispatchOptions {
argv: vec!["false".into()],
..Default::default()
},
tx,
)
.await
.unwrap();
assert_eq!(out.exit_code, 1);
}

/// dispatch_streaming stdout field contains all streamed bytes.
#[tokio::test]
async fn streaming_stdout_field_matches_channel() {
let d = default_dispatcher();
let (tx, mut rx) = mpsc::unbounded_channel();
let out = d
.dispatch_streaming(
DispatchOptions {
argv: vec!["echo".into(), "hello".into()],
..Default::default()
},
tx,
)
.await
.unwrap();
assert!(!out.stdout.is_empty());
let line = rx.try_recv().unwrap();
assert_eq!(line, "hello");
}
}
103 changes: 103 additions & 0 deletions logicshell-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,68 @@ impl LogicShell {
AuditSink::from_config(&self.config.audit)?.write(record)
}

/// Stream stdout of a child process line-by-line into `line_tx` — Phase 13.
///
/// Safety, hooks, and audit follow the same pipeline as [`dispatch`].
/// Each stdout line is sent to `line_tx` as it arrives; stderr is discarded
/// (callers that need stderr should use `dispatch` instead).
/// Returns `(exit_code, elapsed_duration)`.
///
/// [`dispatch`]: LogicShell::dispatch
pub async fn dispatch_streaming(
&self,
argv: &[&str],
line_tx: tokio::sync::mpsc::UnboundedSender<String>,
) -> Result<(i32, std::time::Duration)> {
let cwd = std::env::current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| String::from("?"));

let engine = SafetyPolicyEngine::new(self.config.safety_mode.clone(), &self.config.safety);
let (assessment, decision) = engine.evaluate(argv);

if decision == Decision::Deny {
let note = assessment.reasons.join("; ");
let record = AuditRecord::new(
cwd,
argv.iter().map(|s| s.to_string()).collect(),
AuditDecision::Deny,
)
.with_note(note.clone());
AuditSink::from_config(&self.config.audit)?.write(&record)?;
return Err(LogicShellError::Safety(format!(
"command denied by safety policy: {note}"
)));
}

let audit_decision = if decision == Decision::Confirm {
AuditDecision::Confirm
} else {
AuditDecision::Allow
};

HookRunner::new(&self.config.hooks).run_pre_exec().await?;

let d = Dispatcher::new(&self.config.limits);
let opts = DispatchOptions {
argv: argv.iter().map(|s| s.to_string()).collect(),
..DispatchOptions::default()
};

let start = std::time::Instant::now();
let output = d.dispatch_streaming(opts, line_tx).await?;
let duration = start.elapsed();

let record = AuditRecord::new(
cwd,
argv.iter().map(|s| s.to_string()).collect(),
audit_decision,
);
AuditSink::from_config(&self.config.audit)?.write(&record)?;

Ok((output.exit_code, duration))
}

/// Evaluate a command through the safety policy engine — FR-30–33.
///
/// Returns a `(RiskAssessment, Decision)` pair. The engine is sync and
Expand Down Expand Up @@ -344,4 +406,45 @@ mod tests {
"loose mode should allow sudo true; got: {result:?}"
);
}

// ── Phase 13: dispatch_streaming ─────────────────────────────────────────

/// dispatch_streaming streams lines and returns exit code + duration.
#[tokio::test]
async fn dispatch_streaming_streams_stdout_lines() {
let (ls, _tmp) = ls_with_temp_audit();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let result = ls
.dispatch_streaming(&["sh", "-c", "echo hello; echo world"], tx)
.await;
assert!(result.is_ok(), "dispatch_streaming failed: {result:?}");
let (exit_code, _duration) = result.unwrap();
assert_eq!(exit_code, 0);
let line1 = rx.try_recv().unwrap();
let line2 = rx.try_recv().unwrap();
assert_eq!(line1, "hello");
assert_eq!(line2, "world");
}

/// dispatch_streaming blocks denied commands.
#[tokio::test]
async fn dispatch_streaming_blocks_denied_commands() {
let (ls, _tmp) = ls_with_temp_audit();
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let result = ls.dispatch_streaming(&["rm", "-rf", "/"], tx).await;
assert!(
matches!(result, Err(LogicShellError::Safety(_))),
"expected Safety error; got: {result:?}"
);
}

/// dispatch_streaming returns correct duration.
#[tokio::test]
async fn dispatch_streaming_duration_is_positive() {
let (ls, _tmp) = ls_with_temp_audit();
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (_, duration) = ls.dispatch_streaming(&["true"], tx).await.unwrap();
// Duration should be non-negative (could be zero on fast systems).
assert!(duration.as_nanos() < 10_000_000_000, "duration too large");
}
}
Loading
Loading