diff --git a/Cargo.lock b/Cargo.lock index 9df5f494..4b4b98fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,7 @@ dependencies = [ "chrono", "ctor", "flexi_logger", + "flume", "io-uring", "libc", "log", @@ -310,6 +311,9 @@ name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +dependencies = [ + "getrandom 0.2.17", +] [[package]] name = "find-msvc-tools" @@ -330,6 +334,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "flume" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" +dependencies = [ + "fastrand", + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "futures-core" version = "0.3.31" @@ -347,6 +363,12 @@ dependencies = [ "syn", ] +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + [[package]] name = "futures-task" version = "0.3.31" @@ -373,6 +395,19 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi", + "wasm-bindgen", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -844,6 +879,15 @@ dependencies = [ "syn", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "strsim" version = "0.11.1" @@ -868,7 +912,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.3.4", "once_cell", "rustix", "windows-sys", diff --git a/Cargo.toml b/Cargo.toml index c5525f0d..0d4ad7fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ alioth-macros = { path = "alioth-macros", version = "0.12.0" } assert_matches = "1" ctor = "0.6" flexi_logger = "0.31" +flume = "0.12.0" parking_lot = { version = "0.12", features = ["hardware-lock-elision"] } pretty_assertions = "1" proc-macro2 = "1" diff --git a/alioth/Cargo.toml b/alioth/Cargo.toml index bbf5de36..41285388 100644 --- a/alioth/Cargo.toml +++ b/alioth/Cargo.toml @@ -11,6 +11,7 @@ alioth-macros.workspace = true bitfield = "0.19.4" bitflags = "2.11.0" chrono = "0.4.44" +flume.workspace = true libc = "0.2.184" log = "0.4" mio = { version = "1", features = ["net", "os-ext", "os-poll"] } diff --git a/alioth/src/board/board.rs b/alioth/src/board/board.rs index b5fff052..fee407e8 100644 --- a/alioth/src/board/board.rs +++ b/alioth/src/board/board.rs @@ -23,9 +23,9 @@ mod x86_64; use std::collections::HashMap; use std::ffi::CStr; use std::sync::Arc; -use std::sync::mpsc::Sender; use std::thread::JoinHandle; +use flume::Sender; use libc::{MAP_PRIVATE, MAP_SHARED}; use parking_lot::{Condvar, Mutex, RwLock, RwLockReadGuard}; use serde::Deserialize; diff --git a/alioth/src/hv/hv.rs b/alioth/src/hv/hv.rs index 741f1658..dd9de1b7 100644 --- a/alioth/src/hv/hv.rs +++ b/alioth/src/hv/hv.rs @@ -117,8 +117,8 @@ pub enum Error { TdxErr { code: u64 }, } -impl From for Error { - fn from(error: std::sync::mpsc::RecvError) -> Self { +impl From for Error { + fn from(error: flume::RecvError) -> Self { let source = error.as_error_source(); Error::BrokenChannel { _location: snafu::GenerateImplicitData::generate_with_source(source), @@ -126,8 +126,8 @@ impl From for Error { } } -impl From> for Error { - fn from(error: std::sync::mpsc::SendError) -> Self { +impl From> for Error { + fn from(error: flume::SendError) -> Self { let source = error.as_error_source(); Error::BrokenChannel { _location: snafu::GenerateImplicitData::generate_with_source(source), diff --git a/alioth/src/hv/hvf/vcpu/vcpu.rs b/alioth/src/hv/hvf/vcpu/vcpu.rs index a4ef7c61..728f4175 100644 --- a/alioth/src/hv/hvf/vcpu/vcpu.rs +++ b/alioth/src/hv/hvf/vcpu/vcpu.rs @@ -17,10 +17,10 @@ mod vmexit; use std::collections::HashMap; use std::ptr::null_mut; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, mpsc}; +use flume::{Receiver, Sender}; use parking_lot::Mutex; use snafu::ResultExt; @@ -70,7 +70,7 @@ impl HvfVcpu { let ret = unsafe { hv_vcpu_set_sys_reg(vcpu_id, SReg::MPIDR_EL1, mpidr.0) }; check_ret(ret).context(error::VcpuReg)?; - let (sender, receiver) = mpsc::channel(); + let (sender, receiver) = flume::unbounded(); let power_on = Arc::new(AtomicBool::new(false)); let handle = Arc::new(VcpuHandle { diff --git a/alioth/src/virtio/dev/balloon.rs b/alioth/src/virtio/dev/balloon.rs index 062edafb..156c992b 100644 --- a/alioth/src/virtio/dev/balloon.rs +++ b/alioth/src/virtio/dev/balloon.rs @@ -15,10 +15,10 @@ use std::fmt::Debug; use std::io::{IoSlice, IoSliceMut}; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; use alioth_macros::Layout; +use flume::Receiver; use libc::{_SC_PAGESIZE, sysconf}; use mio::Registry; use mio::event::Event; diff --git a/alioth/src/virtio/dev/blk.rs b/alioth/src/virtio/dev/blk.rs index aa97dc33..79022458 100644 --- a/alioth/src/virtio/dev/blk.rs +++ b/alioth/src/virtio/dev/blk.rs @@ -19,9 +19,9 @@ use std::os::fd::AsRawFd; use std::os::unix::fs::FileExt; use std::path::Path; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; #[cfg(target_os = "linux")] use io_uring::cqueue::Entry as Cqe; #[cfg(target_os = "linux")] diff --git a/alioth/src/virtio/dev/dev.rs b/alioth/src/virtio/dev/dev.rs index caa5e22a..ff3cae47 100644 --- a/alioth/src/virtio/dev/dev.rs +++ b/alioth/src/virtio/dev/dev.rs @@ -25,10 +25,10 @@ pub mod vsock; use std::fmt::Debug; use std::sync::Arc; use std::sync::atomic::{AtomicU8, AtomicU16, AtomicU32}; -use std::sync::mpsc::{self, Receiver, Sender}; use std::thread::JoinHandle; use bitflags::Flags; +use flume::{Receiver, Sender}; use snafu::ResultExt; use crate::hv::IoeventFd; @@ -194,7 +194,7 @@ where let queue_regs = queue_regs.collect::>(); let shared_mem_regions = dev.shared_mem_regions(); - let (event_tx, event_rx) = mpsc::channel(); + let (event_tx, event_rx) = flume::unbounded(); let (handle, notifier) = dev.spawn_worker(event_rx, memory, queue_regs.clone())?; log::debug!( "{name}: created with {:x?}, {:x?}", diff --git a/alioth/src/virtio/dev/entropy.rs b/alioth/src/virtio/dev/entropy.rs index 92f4b703..30e2ffb6 100644 --- a/alioth/src/virtio/dev/entropy.rs +++ b/alioth/src/virtio/dev/entropy.rs @@ -17,9 +17,9 @@ use std::fs::{File, OpenOptions}; use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use libc::O_NONBLOCK; use mio::Registry; use mio::event::Event; diff --git a/alioth/src/virtio/dev/entropy_test.rs b/alioth/src/virtio/dev/entropy_test.rs index 05a6c5dc..3418a954 100644 --- a/alioth/src/virtio/dev/entropy_test.rs +++ b/alioth/src/virtio/dev/entropy_test.rs @@ -16,11 +16,11 @@ use std::ffi::CString; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::fs::OpenOptionsExt; -use std::sync::mpsc::TryRecvError; use std::sync::{Arc, mpsc}; use std::time::Duration; use assert_matches::assert_matches; +use flume::TryRecvError; use rstest::rstest; use tempfile::TempDir; @@ -78,9 +78,9 @@ fn entropy_test(fixture_ram_bus: RamBus, fixture_queues: Box<[QueueReg]>) { assert_matches!(*dev.config(), EntropyConfig); assert_eq!(dev.feature(), FEATURE_BUILT_IN); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = flume::unbounded(); let (handle, notifier) = dev.spawn_worker(rx, ram_bus.clone(), regs).unwrap(); - let (irq_tx, irq_rx) = mpsc::channel(); + let (irq_tx, irq_rx) = flume::unbounded(); let irq_sender = Arc::new(FakeIrqSender { q_tx: irq_tx }); let start_param = StartParam { feature: VirtioFeature::VERSION_1.bits(), diff --git a/alioth/src/virtio/dev/fs/fs.rs b/alioth/src/virtio/dev/fs/fs.rs index 6f30bf56..4005fa07 100644 --- a/alioth/src/virtio/dev/fs/fs.rs +++ b/alioth/src/virtio/dev/fs/fs.rs @@ -21,9 +21,9 @@ use std::fs::File; use std::io::{self, IoSlice, IoSliceMut, Read}; use std::os::fd::AsRawFd; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use mio::Registry; use mio::event::Event; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; diff --git a/alioth/src/virtio/dev/fs/vu.rs b/alioth/src/virtio/dev/fs/vu.rs index f9502e21..a9945fef 100644 --- a/alioth/src/virtio/dev/fs/vu.rs +++ b/alioth/src/virtio/dev/fs/vu.rs @@ -19,9 +19,9 @@ use std::mem::size_of_val; use std::os::fd::{AsFd, AsRawFd}; use std::path::Path; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use libc::{MAP_ANONYMOUS, MAP_FAILED, MAP_FIXED, MAP_PRIVATE, MAP_SHARED, PROT_NONE, mmap}; use mio::event::Event; use mio::unix::SourceFd; diff --git a/alioth/src/virtio/dev/net/tap.rs b/alioth/src/virtio/dev/net/tap.rs index 4e666ddc..96a09857 100644 --- a/alioth/src/virtio/dev/net/tap.rs +++ b/alioth/src/virtio/dev/net/tap.rs @@ -21,9 +21,9 @@ use std::os::fd::{AsFd, AsRawFd}; use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use io_uring::cqueue::Entry as Cqe; use io_uring::opcode; use io_uring::types::Fd; diff --git a/alioth/src/virtio/dev/net/vmnet.rs b/alioth/src/virtio/dev/net/vmnet.rs index d0c9ae63..341887be 100644 --- a/alioth/src/virtio/dev/net/vmnet.rs +++ b/alioth/src/virtio/dev/net/vmnet.rs @@ -16,12 +16,12 @@ use std::ffi::CStr; use std::fmt::Debug; use std::io::{self, ErrorKind, Read}; use std::ptr::null; +use std::sync::Arc; use std::sync::atomic::{AtomicPtr, Ordering}; -use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, mpsc}; use std::thread::JoinHandle; use std::time::Duration; +use flume::{Receiver, Sender}; use libc::c_void; use mio::event::Event; use mio::{Interest, Registry, Token}; @@ -92,7 +92,7 @@ impl Net { ]; let desc = unsafe { xpc_dictionary_create(keys.as_ptr(), vals.as_ptr(), 3) }; let dispatch_queue = unsafe { dispatch_queue_create(c"virtio-net".as_ptr(), null()) }; - let (sender, receiver) = mpsc::channel::>(); + let (sender, receiver) = flume::unbounded::>(); #[repr(C)] struct HandlerBlock { @@ -180,7 +180,7 @@ impl Drop for Net { let interface = self.interface.load(Ordering::Acquire); let dispatch_queue = self.dispatch_queue.load(Ordering::Acquire); - let (sender, receiver) = mpsc::channel::(); + let (sender, receiver) = flume::unbounded::(); #[repr(C)] struct HandlerBlock { diff --git a/alioth/src/virtio/dev/vsock/uds_vsock.rs b/alioth/src/virtio/dev/vsock/uds_vsock.rs index 72c3cecf..f14b573e 100644 --- a/alioth/src/virtio/dev/vsock/uds_vsock.rs +++ b/alioth/src/virtio/dev/vsock/uds_vsock.rs @@ -22,9 +22,16 @@ use std::os::fd::AsRawFd; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::Path; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; +use mio::event::Event; +use mio::unix::SourceFd; +use mio::{Interest, Registry, Token}; +use serde::Deserialize; +use serde_aco::Help; +use zerocopy::{FromBytes, IntoBytes}; + use crate::ffi; use crate::hv::IoeventFd; use crate::mem::mapped::RamBus; @@ -38,13 +45,6 @@ use crate::virtio::queue::{DescChain, Queue, QueueReg, Status, VirtQueue}; use crate::virtio::worker::mio::{ActiveMio, Mio, VirtioMio}; use crate::virtio::{DeviceId, FEATURE_BUILT_IN, IrqSender, Result, error}; -use mio::event::Event; -use mio::unix::SourceFd; -use mio::{Interest, Registry, Token}; -use serde::Deserialize; -use serde_aco::Help; -use zerocopy::{FromBytes, IntoBytes}; - const HEADER_SIZE: usize = size_of::(); const SOCKET_TYPE: VsockType = VsockType::STREAM; diff --git a/alioth/src/virtio/dev/vsock/uds_vsock_test.rs b/alioth/src/virtio/dev/vsock/uds_vsock_test.rs index ad653549..25fdb981 100644 --- a/alioth/src/virtio/dev/vsock/uds_vsock_test.rs +++ b/alioth/src/virtio/dev/vsock/uds_vsock_test.rs @@ -15,11 +15,11 @@ use std::io::{BufRead, BufReader, ErrorKind, Read, Write}; use std::mem::size_of; use std::os::unix::net::{UnixListener, UnixStream}; -use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::sync::{Arc, mpsc}; use std::time::Duration; use assert_matches::assert_matches; +use flume::{Receiver, Sender, TryRecvError}; use rstest::rstest; use tempfile::TempDir; use zerocopy::{FromBytes, FromZeros, IntoBytes}; @@ -133,9 +133,9 @@ fn vsock_conn_test(fixture_ram_bus: RamBus, #[with(3)] fixture_queues: Box<[Queu VsockFeature::STREAM.bits() | FEATURE_BUILT_IN ); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = flume::unbounded(); let (handle, notifier) = dev.spawn_worker(rx, ram_bus.clone(), regs).unwrap(); - let (irq_tx, irq_rx) = mpsc::channel(); + let (irq_tx, irq_rx) = flume::unbounded(); let irq_sender = Arc::new(FakeIrqSender { q_tx: irq_tx }); let start_param = StartParam { feature: VirtioFeature::VERSION_1.bits(), diff --git a/alioth/src/virtio/dev/vsock/vhost_vsock.rs b/alioth/src/virtio/dev/vsock/vhost_vsock.rs index 25a95910..0e648478 100644 --- a/alioth/src/virtio/dev/vsock/vhost_vsock.rs +++ b/alioth/src/virtio/dev/vsock/vhost_vsock.rs @@ -16,9 +16,9 @@ use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::path::Path; use std::sync::Arc; use std::sync::atomic::Ordering; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use libc::{EFD_CLOEXEC, EFD_NONBLOCK, eventfd}; use mio::event::Event; use mio::unix::SourceFd; diff --git a/alioth/src/virtio/pci.rs b/alioth/src/virtio/pci.rs index 69dec1d9..4a9c8375 100644 --- a/alioth/src/virtio/pci.rs +++ b/alioth/src/virtio/pci.rs @@ -18,9 +18,9 @@ use std::mem::size_of; use std::os::fd::{AsFd, AsRawFd, BorrowedFd}; use std::sync::Arc; use std::sync::atomic::{AtomicU16, Ordering}; -use std::sync::mpsc::Sender; use alioth_macros::Layout; +use flume::Sender; use parking_lot::{Mutex, RwLock}; use zerocopy::{FromZeros, Immutable, IntoBytes}; diff --git a/alioth/src/virtio/queue/queue_test.rs b/alioth/src/virtio/queue/queue_test.rs index c4c12827..8a19ff34 100644 --- a/alioth/src/virtio/queue/queue_test.rs +++ b/alioth/src/virtio/queue/queue_test.rs @@ -16,9 +16,9 @@ use std::collections::HashMap; use std::io::{ErrorKind, IoSlice, IoSliceMut, Read, Write}; use std::ptr::eq as ptr_eq; use std::sync::atomic::Ordering; -use std::sync::mpsc::{self, TryRecvError}; use assert_matches::assert_matches; +use flume::TryRecvError; use rstest::rstest; use crate::mem::mapped::RamBus; @@ -183,7 +183,7 @@ fn test_copy_from_reader(fixture_ram_bus: RamBus, fixture_queues: Box<[QueueReg] let mut guest_q = GuestQueue::new(SplitQueue::new(reg, &*ram, false).unwrap().unwrap(), reg); assert!(ptr_eq(host_q.reg(), reg)); - let (irq_tx, irq_rx) = mpsc::channel(); + let (irq_tx, irq_rx) = flume::unbounded(); let irq_sender = FakeIrqSender { q_tx: irq_tx }; let str_0 = "Hello, World!"; @@ -338,7 +338,7 @@ fn test_copy_to_writer(fixture_ram_bus: RamBus, fixture_queues: Box<[QueueReg]>) &ram, ); let mut guest_q = GuestQueue::new(SplitQueue::new(reg, &*ram, false).unwrap().unwrap(), reg); - let (irq_tx, irq_rx) = mpsc::channel(); + let (irq_tx, irq_rx) = flume::unbounded(); let irq_sender = FakeIrqSender { q_tx: irq_tx }; let str_0 = "Hello, World!"; @@ -468,7 +468,7 @@ fn test_handle_deferred(fixture_ram_bus: RamBus, fixture_queues: Box<[QueueReg]> &ram, ); let mut guest_q = GuestQueue::new(SplitQueue::new(reg, &ram, false).unwrap().unwrap(), reg); - let (irq_tx, irq_rx) = mpsc::channel(); + let (irq_tx, irq_rx) = flume::unbounded(); let irq_sender = FakeIrqSender { q_tx: irq_tx }; let str_0 = "Hello, World!"; diff --git a/alioth/src/virtio/virtio_test.rs b/alioth/src/virtio/virtio_test.rs index 5e2072bb..74e126f3 100644 --- a/alioth/src/virtio/virtio_test.rs +++ b/alioth/src/virtio/virtio_test.rs @@ -14,8 +14,8 @@ use std::os::fd::{AsFd, BorrowedFd}; use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64}; -use std::sync::mpsc::Sender; +use flume::Sender; use rstest::fixture; use crate::hv::IoeventFd; diff --git a/alioth/src/virtio/vu/frontend.rs b/alioth/src/virtio/vu/frontend.rs index 07397dc4..3c347e14 100644 --- a/alioth/src/virtio/vu/frontend.rs +++ b/alioth/src/virtio/vu/frontend.rs @@ -16,9 +16,9 @@ use std::os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd}; use std::path::Path; use std::sync::Arc; use std::sync::atomic::Ordering; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use mio::event::Event; use mio::unix::SourceFd; use mio::{Interest, Registry, Token}; diff --git a/alioth/src/virtio/worker/io_uring.rs b/alioth/src/virtio/worker/io_uring.rs index c2951b27..c87b659e 100644 --- a/alioth/src/virtio/worker/io_uring.rs +++ b/alioth/src/virtio/worker/io_uring.rs @@ -15,9 +15,9 @@ use std::iter; use std::os::fd::{AsFd, AsRawFd}; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use io_uring::cqueue::Entry as Cqe; use io_uring::squeue::Entry as Sqe; use io_uring::{SubmissionQueue, opcode, types}; diff --git a/alioth/src/virtio/worker/mio.rs b/alioth/src/virtio/worker/mio.rs index 96044af1..38f7860d 100644 --- a/alioth/src/virtio/worker/mio.rs +++ b/alioth/src/virtio/worker/mio.rs @@ -14,9 +14,9 @@ use std::os::fd::AsRawFd; use std::sync::Arc; -use std::sync::mpsc::Receiver; use std::thread::JoinHandle; +use flume::Receiver; use mio::event::Event; use mio::unix::SourceFd; use mio::{Events, Interest, Poll, Registry, Token}; diff --git a/alioth/src/vm/vm.rs b/alioth/src/vm/vm.rs index b0f8f574..95ba96e0 100644 --- a/alioth/src/vm/vm.rs +++ b/alioth/src/vm/vm.rs @@ -15,10 +15,10 @@ #[cfg(target_os = "linux")] use std::path::Path; use std::sync::Arc; -use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; use std::time::Duration; +use flume::{Receiver, Sender}; #[cfg(target_os = "linux")] use parking_lot::Mutex; use snafu::{ResultExt, Snafu}; @@ -125,7 +125,7 @@ where pub fn new(hv: &H, config: BoardConfig) -> Result { let board = Arc::new(Board::new(hv, config)?); - let (event_tx, event_rx) = mpsc::channel(); + let (event_tx, event_rx) = flume::unbounded(); let mut vcpus = board.vcpus.write(); for index in 0..board.config.cpu.count {