diff --git a/src/lib.rs b/src/lib.rs index f94e9be..69d4808 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Deserializer}; use std::{collections::HashMap, fmt::Display, net::Ipv4Addr, str::FromStr}; use syscalls::Sysno; +pub mod machine; pub mod script; pub mod worker; diff --git a/src/machine.rs b/src/machine.rs new file mode 100644 index 0000000..6fa54c3 --- /dev/null +++ b/src/machine.rs @@ -0,0 +1,115 @@ +use crate::script::ast::MachineInstruction; + +use log::{debug, trace}; +use std::{ + fs, + io::{BufReader, prelude::*}, + net::TcpListener, + thread, +}; + +use crate::script::ast::Node; + +#[derive(Debug)] +pub enum MachineError { + Internal, +} + +/// Start a dummy listening server +fn start_server(addr: String, target_port: u16) -> Result<(), MachineError> { + debug!("Starting server at {:?}:{:?}", addr, target_port); + + let listener = TcpListener::bind((addr, target_port)).unwrap(); + + for stream in listener.incoming() { + let mut stream = stream.unwrap(); + + // As a simplest solution to keep a connection open, spawn a + // thread. It's not the best one though, as we waste resources. + // For the purpose of only keeping connections open we could e.g. + // spawn only two threads, where the first one receives connections + // and adds streams into the list of active, and the second iterates + // through streams and replies. This way the connections will have + // high latency, but for the purpose of networking workload it + // doesn't matter. + thread::spawn(move || { + loop { + let mut buf_reader = BufReader::new(&stream); + let mut buffer = String::new(); + + match buf_reader.read_line(&mut buffer) { + Ok(0) => { + // EOF, exit + trace!("EOF"); + return; + } + Ok(_n) => { + trace!("Received {:?}", buffer); + + let response = "hello\n"; + match stream.write_all(response.as_bytes()) { + Ok(_) => { + // Response is sent, handle the next one + } + Err(e) => { + trace!("ERROR: sending response, {}", e); + break; + } + } + } + Err(e) => { + trace!("ERROR: reading a line, {}", e) + } + } + } + }); + } + + Ok(()) +} + +/// Make sure a path exists +fn ensure_path(path: String) -> Result<(), MachineError> { + debug!("Create path {path:?}"); + match fs::create_dir_all(path) { + Ok(()) => { + debug!("Success"); + Ok(()) + } + Err(e) => { + debug!("Failure {e:?}"); + Err(MachineError::Internal) + } + } +} + +fn apply_instr(instr: MachineInstruction) -> Result<(), MachineError> { + match instr { + MachineInstruction::Server { port } => { + start_server("127.0.0.1".to_string(), port) + } + MachineInstruction::Profile { target: _ } => todo!(), + MachineInstruction::Path { value } => ensure_path(value), + } +} + +pub fn apply(machine: Vec<&Node>) -> Result<(), MachineError> { + for m in machine { + let Node::Machine { + ref m_instructions, .. + } = *m + else { + unreachable!() + }; + + for instr in m_instructions.clone() { + // TODO: Note that for some machine statements there is a race + // conditions here, since we do not wait for them to apply. + // Introduce a distinction between background statements, and those + // that have to be performed synchronously. + thread::spawn(move || apply_instr(instr)); + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 10faab0..b9c731c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,6 +38,7 @@ use berserker::{ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use berserker::machine::apply; use berserker::script::{ ast::Node, parser::parse_instructions, rules::apply_rules, }; @@ -65,12 +66,14 @@ fn run_script(script_path: String) -> Vec<(i32, u64)> { parse_instructions(&std::fs::read_to_string(script_path).unwrap()) .unwrap(); - let (_machine, works): (Vec<_>, Vec<_>) = + let (machine, works): (Vec<_>, Vec<_>) = ast.iter().partition_map(|node| match node { Node::Work { .. } => Either::Right(node), Node::Machine { .. } => Either::Left(node), }); + let _ = apply(machine); + apply_rules(works) .into_iter() .flat_map(|node| { @@ -311,20 +314,27 @@ mod tests { #[test] fn test_file_script() { let input = r#" + machine { + path("/tmp/data"); + } + random (workers = 1, duration = 10) { - open(random_path("/tmp")); + open(random_path("/tmp/data")); } repeated (workers = 1, duration = 10) { - open("/tmp/test"); + open("/tmp/data/test"); } "#; let ast: Vec = parse_instructions(input).unwrap(); - assert_eq!(ast.len(), 2); + assert_eq!(ast.len(), 3); - new_script_worker(ast[0].clone()).run_payload().unwrap(); + // apply machine statements + let _ = apply(vec![&ast[0]]); + // run workers new_script_worker(ast[1].clone()).run_payload().unwrap(); + new_script_worker(ast[2].clone()).run_payload().unwrap(); } }