diff --git a/kernel/comps/block/src/bio.rs b/kernel/comps/block/src/bio.rs index 881835274..cd61d0d57 100644 --- a/kernel/comps/block/src/bio.rs +++ b/kernel/comps/block/src/bio.rs @@ -8,7 +8,7 @@ use aster_time::read_monotonic_time; use bitvec::array::BitArray; use int_to_c_enum::TryFromInt; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::{OQueueAttachError, Producer}; +use ostd::orpc::oqueue::{OQueueError, ValueProducer}; use ostd::{ Error, mm::{ @@ -25,7 +25,7 @@ use crate::{BLOCK_SIZE, SECTOR_SIZE, prelude::*, request_queue::BioRequestSingle /// Trace data for block device I/O completion. /// /// This struct captures performance metrics when a block I/O request completes. -#[derive(Clone)] +#[derive(Clone, Copy, Default)] pub struct BlockDeviceCompletionStats { /// The latency of the I/O request (time from submission to completion). pub latency: Duration, @@ -202,13 +202,13 @@ pub enum BioEnqueueError { TooBig, /// OQueue attachment failures #[cfg(not(baseline_asterinas))] - OQueueAttachError(OQueueAttachError), + OQueueError(OQueueError), } #[cfg(not(baseline_asterinas))] -impl From for BioEnqueueError { - fn from(err: OQueueAttachError) -> Self { - Self::OQueueAttachError(err) +impl From for BioEnqueueError { + fn from(err: OQueueError) -> Self { + Self::OQueueError(err) } } @@ -325,7 +325,7 @@ pub struct SubmittedBio { bio_inner: Arc, #[cfg(not(baseline_asterinas))] - reply_handle: Option>>, + reply_handle: Option>, submission_time: Option, @@ -406,7 +406,7 @@ impl SubmittedBio { #[cfg(not(baseline_asterinas))] pub fn prepare_enqueue( &mut self, - reply_handle: Box>, + reply_handle: ValueProducer, bio_request_single_queue: Arc, ) { self.reply_handle = Some(reply_handle); diff --git a/kernel/comps/block/src/lib.rs b/kernel/comps/block/src/lib.rs index 9e89e4432..71271160a 100644 --- a/kernel/comps/block/src/lib.rs +++ b/kernel/comps/block/src/lib.rs @@ -41,7 +41,7 @@ pub mod request_queue; pub mod test_utils; use component::{ComponentInitError, init_component}; -use ostd::sync::SpinLock; +use ostd::{orpc::path::Path, sync::SpinLock}; use spin::Once; use self::{ @@ -58,6 +58,8 @@ pub trait BlockDevice: Send + Sync + Any + Debug { /// Returns the metadata of the block device. fn metadata(&self) -> BlockDeviceMeta; + + fn path(&self) -> Path; } /// Metadata for a block device. diff --git a/kernel/comps/block/src/test_utils.rs b/kernel/comps/block/src/test_utils.rs index efedda32f..b228ea009 100644 --- a/kernel/comps/block/src/test_utils.rs +++ b/kernel/comps/block/src/test_utils.rs @@ -4,7 +4,7 @@ use alloc::{boxed::Box, vec}; -use ostd::{mm::UntypedMem, sync::Mutex}; +use ostd::{mm::UntypedMem, orpc::path::Path, path, sync::Mutex}; use crate::{ BlockDevice, BlockDeviceMeta, SECTOR_SIZE, @@ -24,6 +24,10 @@ impl BlockDevice for FakeBlockDevice { fn metadata(&self) -> crate::BlockDeviceMeta { todo!() } + + fn path(&self) -> Path { + Path::test() + } } /// A block device backed by memory. @@ -77,4 +81,8 @@ impl BlockDevice for MemoryDisk { nr_sectors: self.data.lock().len() / SECTOR_SIZE, } } + + fn path(&self) -> Path { + path!(memory_disk) + } } diff --git a/kernel/comps/mariposa_data_capture/src/data_capture_device.rs b/kernel/comps/mariposa_data_capture/src/data_capture_device.rs index e6aca17a7..1c7101ef0 100644 --- a/kernel/comps/mariposa_data_capture/src/data_capture_device.rs +++ b/kernel/comps/mariposa_data_capture/src/data_capture_device.rs @@ -56,8 +56,8 @@ pub struct DataCaptureDeviceServer { } impl DataCaptureDeviceServer { - pub fn new(block_device: Arc) -> Arc { - new_server!(|_| DataCaptureDeviceServer { + pub fn new(path: Path, block_device: Arc) -> Arc { + new_server!(path, |_| DataCaptureDeviceServer { block_device, next_block_offset: AtomicUsize::new(0), }) diff --git a/kernel/comps/mariposa_data_capture/src/data_capture_file.rs b/kernel/comps/mariposa_data_capture/src/data_capture_file.rs index 3f8a76d74..cd029d6a1 100644 --- a/kernel/comps/mariposa_data_capture/src/data_capture_file.rs +++ b/kernel/comps/mariposa_data_capture/src/data_capture_file.rs @@ -204,7 +204,7 @@ impl DataCaptureFileBuilder { .call_in_context(move || -> Result>, RPCError> { let command_oqueue = ConsumableOQueueRef::new(8, self.path.append(&path!(commands))); - let server = new_server!(|_| DataCaptureFileServer { + let server = new_server!(self.path, |_| DataCaptureFileServer { command_producer: command_oqueue .attach_value_producer() .expect("single purpose OQueue failed."), diff --git a/kernel/comps/mariposa_data_capture/src/lib.rs b/kernel/comps/mariposa_data_capture/src/lib.rs index 67f4c3c01..7aa78f0f1 100644 --- a/kernel/comps/mariposa_data_capture/src/lib.rs +++ b/kernel/comps/mariposa_data_capture/src/lib.rs @@ -58,7 +58,10 @@ mod tests { use aster_block::test_utils::MemoryDisk; use ostd::{ assertion::sleep, - orpc::oqueue::{OQueue, OQueueBase, OQueueRef, ObservationQuery}, + orpc::{ + oqueue::{OQueue, OQueueBase, OQueueRef, ObservationQuery}, + path::Path, + }, path, prelude::*, }; @@ -71,7 +74,7 @@ mod tests { fn test_capture_server() { // Create memory disk with space for 4 blocks let block_device = Arc::new(MemoryDisk::new(4096 * 4)); - let device = DataCaptureDeviceServer::new(block_device.clone()); + let device = DataCaptureDeviceServer::new(Path::test(), block_device.clone()); let path = path!(test_capture); let builder = device diff --git a/kernel/comps/mlsdisk/src/layers/5-disk/mlsdisk.rs b/kernel/comps/mlsdisk/src/layers/5-disk/mlsdisk.rs index 88814b856..cb282fa59 100644 --- a/kernel/comps/mlsdisk/src/layers/5-disk/mlsdisk.rs +++ b/kernel/comps/mlsdisk/src/layers/5-disk/mlsdisk.rs @@ -15,7 +15,7 @@ use core::{ sync::atomic::{AtomicBool, Ordering}, }; -use ostd::{ignore_err, mm::VmIo}; +use ostd::{ignore_err, mm::VmIo, orpc::path::Path, path}; use ostd_pod::Pod; use super::{ @@ -65,6 +65,8 @@ struct DiskInner { root_key: Key, /// Whether `MlsDisk` is dropped. is_dropped: AtomicBool, + /// The path of this device. + path: Path, /// Scope lock for control write and sync operation. write_sync_region: RwLock<()>, } @@ -163,6 +165,10 @@ impl aster_block::BlockDevice for MlsDisk { nr_sectors: (BLOCK_SIZE / SECTOR_SIZE) * self.total_blocks(), } } + + fn path(&self) -> ostd::orpc::path::Path { + self.inner.path.clone() + } } impl MlsDisk { @@ -211,6 +217,9 @@ impl MlsDisk { self.inner.user_data_disk.nblocks() } + // TODO(arthurp): The paths of the disks are not based on the underlying block devices like they + // should be. + /// Creates a new `MlsDisk` on the given disk, with the root encryption key. pub fn create( disk: D, @@ -254,6 +263,7 @@ impl MlsDisk { root_key, is_dropped: AtomicBool::new(false), write_sync_region: RwLock::new(()), + path: path!(block_device.create_mls[unique]), }), }; @@ -306,6 +316,7 @@ impl MlsDisk { root_key, is_dropped: AtomicBool::new(false), write_sync_region: RwLock::new(()), + path: path!(block_device.open_mls[unique]), }), }; diff --git a/kernel/comps/mlsdisk/src/lib.rs b/kernel/comps/mlsdisk/src/lib.rs index 333d8a360..6a0246fa0 100644 --- a/kernel/comps/mlsdisk/src/lib.rs +++ b/kernel/comps/mlsdisk/src/lib.rs @@ -131,6 +131,7 @@ mod test { }; use ostd::{ mm::{FrameAllocOptions, Segment, VmIo}, + path, prelude::*, }; @@ -185,6 +186,10 @@ mod test { nr_sectors: self.blocks.size() / SECTOR_SIZE, } } + + fn path(&self) -> ostd::orpc::path::Path { + path!(memory_disk) + } } fn create_rawdisk(nblocks: usize) -> RawDisk { diff --git a/kernel/comps/raid/src/lib.rs b/kernel/comps/raid/src/lib.rs index 57f4d7d34..75cb7c4fc 100644 --- a/kernel/comps/raid/src/lib.rs +++ b/kernel/comps/raid/src/lib.rs @@ -35,7 +35,10 @@ use aster_block::{ request_queue::{BioRequest, BioRequestSingleQueue}, }; #[cfg(not(baseline_asterinas))] -use ostd::orpc::orpc_server; +use ostd::{ + orpc::{orpc_server, path::Path}, + path, +}; #[cfg(not(baseline_asterinas))] use crate::server_traits::SelectionPolicy; @@ -63,6 +66,8 @@ pub struct Raid1Device { /// Basic capacity limits for the logical device (min across members). metadata: BlockDeviceMeta, + path: Path, + /// The policy to select the read member. selection_policy: Arc, } @@ -112,6 +117,8 @@ impl Raid1Device { members: Vec>, selection_policy: Arc, ) -> Result<(), Raid1DeviceError> { + use ostd::new_server; + if members.len() < 2 { return Err(Raid1DeviceError::NotEnoughMembers); } @@ -122,12 +129,12 @@ impl Raid1Device { let queue = BioRequestSingleQueue::with_max_nr_segments_per_bio(metadata.max_nr_segments_per_bio); - let device = Self::new_with(|orpc_internal, _weak_self| Raid1Device { - orpc_internal, + let device = new_server!(path!(block.raid1.{name}), |_| Raid1Device { members, queue, metadata, selection_policy, + path: path!(block_device.raid1.{name}) }); aster_block::register_device(name.to_owned(), device.clone()); @@ -365,4 +372,8 @@ impl BlockDevice for Raid1Device { fn metadata(&self) -> BlockDeviceMeta { self.metadata } + + fn path(&self) -> ostd::orpc::path::Path { + self.path.clone() + } } diff --git a/kernel/comps/raid/src/selection_policies.rs b/kernel/comps/raid/src/selection_policies.rs index 811444e4a..240ed2516 100644 --- a/kernel/comps/raid/src/selection_policies.rs +++ b/kernel/comps/raid/src/selection_policies.rs @@ -5,8 +5,15 @@ use alloc::{sync::Arc, vec::Vec}; use core::sync::atomic::{AtomicUsize, Ordering}; -use aster_block::BlockDevice; -use ostd::{Error, orpc::orpc_server}; +use aster_block::{BlockDevice, bio::BlockDeviceCompletionStats}; +use ostd::{ + Error, + orpc::{ + oqueue::{OQueueBase as _, ObservationQuery}, + orpc_server, + path::Path, + }, +}; use crate::server_traits::{ObservableBlockDevice, SelectionPolicy}; @@ -18,8 +25,8 @@ pub struct RoundRobinPolicy { } impl RoundRobinPolicy { - pub fn new(members: Vec>) -> Result, Error> { - let server = Self::new_with(|orpc_internal, _| Self { + pub fn new(path: Path, members: Vec>) -> Result, Error> { + let server = Self::new_with(path, |orpc_internal, _| Self { orpc_internal, read_cursor: AtomicUsize::new(0), members, @@ -49,8 +56,11 @@ pub struct LinnOSPolicy { } impl LinnOSPolicy { - pub fn new(members: Vec>) -> Result, Error> { - let server = Self::new_with(|orpc_internal, _| Self { + pub fn new( + path: Path, + members: Vec>, + ) -> Result, Error> { + let server = Self::new_with(path, |orpc_internal, _| Self { orpc_internal, read_cursor: AtomicUsize::new(0), members, @@ -70,7 +80,7 @@ impl SelectionPolicy for LinnOSPolicy { .map(|device| { device .bio_completion_oqueue() - .attach_weak_observer() + .attach_weak_observer(4, ObservationQuery::identity()) .expect("Failed to attach weak observer to bio_completion_oqueue") }) .collect(); @@ -78,7 +88,11 @@ impl SelectionPolicy for LinnOSPolicy { loop { let idx = self.read_cursor.fetch_add(1, Ordering::Relaxed); let observer = &trace_observers[idx % trace_observers.len()]; - let completion_trace = observer.weak_observe_recent(4); + let completion_trace: Vec = observer + .weak_observe_recent(4)? + .iter() + .map(|v| v.unwrap_or_default()) + .collect(); // Inference using the ML model let x = self.model[0] * completion_trace[0].latency.as_nanos() as f32 diff --git a/kernel/comps/virtio/src/device/block/device.rs b/kernel/comps/virtio/src/device/block/device.rs index 65303b7dd..5fea3b4d6 100644 --- a/kernel/comps/virtio/src/device/block/device.rs +++ b/kernel/comps/virtio/src/device/block/device.rs @@ -23,15 +23,18 @@ use log::{debug, info}; #[cfg(not(baseline_asterinas))] use ostd::orpc::framework::spawn_thread; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::{OQueueRef, Producer}; +use ostd::orpc::oqueue::{ConsumableOQueue as _, ConsumableOQueueRef, ValueProducer}; #[cfg(not(baseline_asterinas))] use ostd::orpc::{orpc_impl, orpc_server}; use ostd::{ Pod, ignore_err, mm::{DmaDirection, DmaStream, DmaStreamSlice, FrameAllocOptions, VmIo}, + path, sync::SpinLock, trap::TrapFrame, }; +#[cfg(not(baseline_asterinas))] +use ostd::{new_server, orpc::framework::Server}; use super::{BlockFeatures, VirtioBlockConfig, VirtioBlockFeature}; #[cfg(not(baseline_asterinas))] @@ -65,8 +68,8 @@ pub struct BlockDevice { #[cfg(not(baseline_asterinas))] #[orpc_impl] impl server_traits::BlockIOObservable for BlockDevice { - fn bio_submission_oqueue(&self) -> OQueueRef; - fn bio_completion_oqueue(&self) -> OQueueRef; + fn bio_submission_oqueue(&self) -> ConsumableOQueueRef; + fn bio_completion_oqueue(&self) -> ConsumableOQueueRef; } impl BlockDevice { @@ -97,13 +100,15 @@ impl BlockDevice { #[cfg(not(baseline_asterinas))] { - let block_device_server = Self::new_with(|orpc_internal, _weak_self| BlockDevice { - orpc_internal, - device, - queue: Arc::new(BioRequestSingleQueue::with_max_nr_segments_per_bio( - (DeviceInner::QUEUE_SIZE - 2) as usize, - )), - }); + let block_device_server = + new_server!(path!(block_device.virtio.{device_id}), |_weak_self| { + BlockDevice { + device, + queue: Arc::new(BioRequestSingleQueue::with_max_nr_segments_per_bio( + (DeviceInner::QUEUE_SIZE - 2) as usize, + )), + } + }); // Thread 2: Handle requests from the OQueue and enqueue them spawn_thread(block_device_server.clone(), { @@ -170,12 +175,14 @@ impl aster_block::BlockDevice for BlockDevice { #[cfg(not(baseline_asterinas))] impl aster_block::BlockDevice for BlockDevice { fn enqueue(&self, bio: SubmittedBio) -> Result<(), BioEnqueueError> { - let reply_handle: Box> = - self.bio_completion_oqueue().attach_producer()?; + let reply_handle: ValueProducer = + self.bio_completion_oqueue().attach_value_producer()?; let mut bio = bio; bio.prepare_enqueue(reply_handle, self.queue.clone()); - self.bio_submission_oqueue().produce(bio)?; + self.bio_submission_oqueue() + .attach_value_producer()? + .produce(bio); Ok(()) } @@ -185,6 +192,10 @@ impl aster_block::BlockDevice for BlockDevice { nr_sectors: self.device.config_manager.capacity_sectors(), } } + + fn path(&self) -> ostd::orpc::path::Path { + Server::path(self).clone() + } } #[derive(Debug)] diff --git a/kernel/comps/virtio/src/device/block/server_traits.rs b/kernel/comps/virtio/src/device/block/server_traits.rs index c72d89eba..9d9d21d48 100644 --- a/kernel/comps/virtio/src/device/block/server_traits.rs +++ b/kernel/comps/virtio/src/device/block/server_traits.rs @@ -1,41 +1,23 @@ // SPDX-License-Identifier: MPL-2.0 use aster_block::bio::{BlockDeviceCompletionStats, SubmittedBio}; -use ostd::orpc::{ - errors::RPCError, - legacy_oqueue::{ - OQueueAttachError, OQueueRef, - locking::{LockingQueue, ObservableLockingQueue}, - }, - orpc_trait, +use ostd::{ + orpc::{oqueue::ConsumableOQueueRef, orpc_trait}, + path, }; -use crate::device::VirtioDeviceError; - -impl From for VirtioDeviceError { - fn from(value: RPCError) -> Self { - VirtioDeviceError::RPCError(value) - } -} - -impl From for VirtioDeviceError { - fn from(value: OQueueAttachError) -> Self { - VirtioDeviceError::OQueueAttachError(value) - } -} - #[orpc_trait] pub trait BlockIOObservable { /// The OQueue containing every bio submission request. /// The submission queue doesn't needed to be observable. - fn bio_submission_oqueue(&self) -> OQueueRef { - LockingQueue::new(32) + fn bio_submission_oqueue(&self) -> ConsumableOQueueRef { + ConsumableOQueueRef::new(32, path!(bio_submission_oqueue[unique])) } /// The OQueue containing every write request. This includes both sync and async writes and any /// other write operations on other traits - fn bio_completion_oqueue(&self) -> OQueueRef { - ObservableLockingQueue::new(32, 1) + fn bio_completion_oqueue(&self) -> ConsumableOQueueRef { + ConsumableOQueueRef::new(32, path!(bio_completion_oqueue[unique])) } } diff --git a/kernel/comps/virtio/src/device/mod.rs b/kernel/comps/virtio/src/device/mod.rs index 990af57b9..e48a4e277 100644 --- a/kernel/comps/virtio/src/device/mod.rs +++ b/kernel/comps/virtio/src/device/mod.rs @@ -2,7 +2,7 @@ use int_to_c_enum::TryFromInt; #[cfg(not(baseline_asterinas))] -use ostd::orpc::{errors::RPCError, legacy_oqueue::OQueueAttachError}; +use ostd::orpc::{errors::RPCError, oqueue::OQueueError}; use crate::queue::QueueError; @@ -54,7 +54,7 @@ pub enum VirtioDeviceError { RPCError(RPCError), /// The OQueue attachment errors #[cfg(not(baseline_asterinas))] - OQueueAttachError(OQueueAttachError), + OQueueError(OQueueError), } impl From for VirtioDeviceError { @@ -62,3 +62,15 @@ impl From for VirtioDeviceError { VirtioDeviceError::QueueUnknownError } } + +impl From for VirtioDeviceError { + fn from(value: RPCError) -> Self { + VirtioDeviceError::RPCError(value) + } +} + +impl From for VirtioDeviceError { + fn from(value: OQueueError) -> Self { + VirtioDeviceError::OQueueError(value) + } +} diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 89a19d919..54a9e3538 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -497,7 +497,7 @@ impl From for Error { Error::with_message(Errno::EINVAL, "Bio is too big") } #[cfg(not(baseline_asterinas))] - aster_block::bio::BioEnqueueError::OQueueAttachError(err) => err.into(), + aster_block::bio::BioEnqueueError::OQueueError(err) => err.into(), } } } diff --git a/kernel/src/fs/exfat/fs.rs b/kernel/src/fs/exfat/fs.rs index 79ec6207c..cf109f098 100644 --- a/kernel/src/fs/exfat/fs.rs +++ b/kernel/src/fs/exfat/fs.rs @@ -16,8 +16,13 @@ use hashbrown::HashMap; use lru::LruCache; pub(super) use ostd::mm::VmIo; #[cfg(not(baseline_asterinas))] -use ostd::orpc::{legacy_oqueue::OQueueRef, orpc_impl}; -use ostd::{mm::Segment, new_server, orpc::orpc_server}; +use ostd::orpc::{oqueue::OQueue, orpc_impl}; +use ostd::{ + mm::Segment, + new_server, + orpc::{oqueue::OQueueRef, orpc_server}, + path, +}; use super::{ bitmap::ExfatBitmap, @@ -77,7 +82,8 @@ impl ExfatFS { // Load the super_block let super_block = Self::read_super_block(block_device.as_ref())?; let fs_size = super_block.num_clusters as usize * super_block.cluster_size as usize; - let exfat_fs = new_server!(|weak_self| ExfatFS { + let path = block_device.path().append(&path!(exfat)); + let exfat_fs = new_server!(path.clone(), |weak_self| ExfatFS { block_device, super_block, bitmap: Arc::new(Mutex::new(ExfatBitmap::default())), @@ -88,7 +94,7 @@ impl ExfatFS { fat_cache: RwLock::new(LruCache::::new( NonZeroUsize::new(FAT_LRU_CACHE_SIZE).unwrap(), )), - meta_cache: PageCache::with_capacity(fs_size, weak_self.clone() as _).unwrap(), + meta_cache: PageCache::with_capacity(path.append(&path!(page_cache)), fs_size, weak_self.clone() as _).unwrap(), mutex: Mutex::new(()), }); @@ -106,7 +112,7 @@ impl ExfatFS { FatChainFlags::ALLOC_POSSIBLE, )?; - let root = ExfatInode::build_root_inode(weak_fs.clone(), root_chain.clone())?; + let root = ExfatInode::build_root_inode(&path, weak_fs.clone(), root_chain.clone())?; let upcase_table = ExfatUpcaseTable::load( weak_fs.clone(), @@ -400,15 +406,17 @@ impl PageStore for ExfatFS { ); // Produce the handle to the ORPC queue - self.page_reads_oqueue().produce(req.handle.idx)?; - let reply_producer = self.page_reads_reply_oqueue().attach_producer()?; + self.page_reads_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + let reply_producer = self.page_reads_reply_oqueue().attach_ref_producer()?; self.block_device.read_blocks_async_with_closure( BlockId::new(req.handle.idx as u64), bio_segment, move |b| { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); req.reply_handle.produce(req.handle); }, )?; @@ -426,8 +434,10 @@ impl PageStore for ExfatFS { ); // Produce the handle to the ORPC queue - self.page_writes_oqueue().produce(req.handle.idx)?; - let reply_producer = self.page_writes_reply_oqueue().attach_producer()?; + self.page_writes_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + let reply_producer = self.page_writes_reply_oqueue().attach_ref_producer()?; self.block_device.write_blocks_async_with_closure( BlockId::new(req.handle.idx as u64), @@ -435,7 +445,7 @@ impl PageStore for ExfatFS { move |b| { if let Some(reply_handle) = req.reply_handle { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); reply_handle.produce(req.handle); } }, diff --git a/kernel/src/fs/exfat/inode.rs b/kernel/src/fs/exfat/inode.rs index 2c29132e5..7234b8617 100644 --- a/kernel/src/fs/exfat/inode.rs +++ b/kernel/src/fs/exfat/inode.rs @@ -16,11 +16,12 @@ use aster_block::{ }; use aster_rights::Full; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::OQueueRef; +use ostd::orpc::oqueue::{OQueue, OQueueRef}; use ostd::{ mm::{Segment, VmIo}, new_server, - orpc::{orpc_impl, orpc_server}, + orpc::{framework::Server, orpc_impl, orpc_server, path::Path}, + path, }; use super::{ @@ -172,14 +173,16 @@ impl server_traits::PageStore for ExfatInode { BioDirection::FromDevice, ); // Produce the handle to the ORPC queue - self.page_reads_oqueue().produce(req.handle.idx)?; - let reply_producer = self.page_reads_reply_oqueue().attach_producer()?; + self.page_reads_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + let reply_producer = self.page_reads_reply_oqueue().attach_ref_producer()?; inner.fs().block_device().read_blocks_async_with_closure( BlockId::from_offset(sector_id * inner.fs().sector_size()), bio_segment, move |b| { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); req.reply_handle.produce(req.handle); }, )?; @@ -199,15 +202,17 @@ impl server_traits::PageStore for ExfatInode { BioDirection::ToDevice, ); // Produce the handle to the ORPC queue - self.page_writes_oqueue().produce(req.handle.idx)?; - let reply_producer = self.page_writes_reply_oqueue().attach_producer()?; + self.page_writes_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + let reply_producer = self.page_writes_reply_oqueue().attach_ref_producer()?; inner.fs().block_device().write_blocks_async_with_closure( BlockId::from_offset(sector_id * inner.fs().sector_size()), bio_segment, move |b| { if let Some(reply_handle) = req.reply_handle { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); reply_handle.produce(req.handle); } }, @@ -721,6 +726,7 @@ impl ExfatInode { } pub(super) fn build_root_inode( + fs_path: &Path, fs_weak: Weak, root_chain: ExfatChain, ) -> Result> { @@ -740,7 +746,7 @@ impl ExfatInode { let name = ExfatName::new(); - let inode = new_server!(|weak_self| ExfatInode { + let inode = new_server!(fs_path.append(&path!(root)), |weak_self| ExfatInode { inner: RwMutex::new(ExfatInodeInner { ino: EXFAT_ROOT_INO, dentry_set_position: ExfatChainPosition::default(), @@ -760,7 +766,12 @@ impl ExfatInode { is_deleted: false, parent_hash: 0, fs: fs_weak, - page_cache: PageCache::with_capacity(size, weak_self.clone() as _).unwrap(), + page_cache: PageCache::with_capacity( + fs_path.append(&path!(root.page_cache)), + size, + weak_self.clone() as _ + ) + .unwrap(), }), extension: Extension::new(), }); @@ -856,29 +867,37 @@ impl ExfatInode { )?; let name = dentry_set.get_name(fs.upcase_table())?; - let inode = new_server!(|weak_self| ExfatInode { - inner: RwMutex::new(ExfatInodeInner { - ino, - dentry_set_position, - dentry_set_size, - dentry_entry, - inode_type, - attr, - start_chain, - size, - size_allocated, - atime, - mtime, - ctime, - num_sub_inodes: 0, - num_sub_dirs: 0, - name, - is_deleted: false, - parent_hash, - fs: fs_weak, - page_cache: PageCache::with_capacity(size, weak_self.clone() as _).unwrap(), - }), - extension: Extension::new(), + let path = fs.path().append(&path!(dentry[unique])); + let inode = new_server!(path.clone(), |weak_self| { + ExfatInode { + inner: RwMutex::new(ExfatInodeInner { + ino, + dentry_set_position, + dentry_set_size, + dentry_entry, + inode_type, + attr, + start_chain, + size, + size_allocated, + atime, + mtime, + ctime, + num_sub_inodes: 0, + num_sub_dirs: 0, + name, + is_deleted: false, + parent_hash, + fs: fs_weak, + page_cache: PageCache::with_capacity( + path.append(&path!(page_cache)), + size, + weak_self.clone() as _, + ) + .unwrap(), + }), + extension: Extension::new(), + } }); #[cfg(not(baseline_asterinas))] diff --git a/kernel/src/fs/exfat/mod.rs b/kernel/src/fs/exfat/mod.rs index 6245b21d6..34ee39064 100644 --- a/kernel/src/fs/exfat/mod.rs +++ b/kernel/src/fs/exfat/mod.rs @@ -106,6 +106,10 @@ mod test { nr_sectors: self.sectors_count(), } } + + fn path(&self) -> ostd::orpc::path::Path { + ostd::orpc::path::Path::test() + } } /// Exfat disk image static EXFAT_IMAGE: &[u8] = include_bytes!("../../../../test/build/exfat.img"); diff --git a/kernel/src/fs/ext2/block_group.rs b/kernel/src/fs/ext2/block_group.rs index 050f9b7b5..71af78f40 100644 --- a/kernel/src/fs/ext2/block_group.rs +++ b/kernel/src/fs/ext2/block_group.rs @@ -2,12 +2,13 @@ use id_alloc::IdAlloc; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::OQueueRef; +use ostd::orpc::oqueue::OQueueRef; use ostd::{ const_assert, mm::UntypedMem, new_server, - orpc::{orpc_impl, orpc_server}, + orpc::{oqueue::OQueue as _, orpc_impl, orpc_server}, + path, }; use super::{ @@ -48,6 +49,7 @@ impl BlockGroup { fs: Weak, ) -> Result { let raw_inodes_size = (super_block.inodes_per_group() as usize) * super_block.inode_size(); + let block_device_path = block_device.path(); let bg_impl = { let metadata = { @@ -86,19 +88,25 @@ impl BlockGroup { } }; - new_server!(|_| BlockGroupImpl { - inode_table_bid: metadata.descriptor.inode_table_bid, - raw_inodes_size, - inner: RwMutex::new(Inner { - metadata: Dirty::new(metadata), - inode_cache: BTreeMap::new(), - }), - fs, - }) + new_server!( + block_device_path.append(&path!(ext2.block_group[{ idx }])), + |_| BlockGroupImpl { + inode_table_bid: metadata.descriptor.inode_table_bid, + raw_inodes_size, + inner: RwMutex::new(Inner { + metadata: Dirty::new(metadata), + inode_cache: BTreeMap::new(), + }), + fs, + } + ) }; - let raw_inodes_cache = - PageCache::with_capacity(raw_inodes_size, Arc::downgrade(&bg_impl) as _)?; + let raw_inodes_cache = PageCache::with_capacity( + block_device_path.append(&path!(ext2.raw_inodes_cache)), + raw_inodes_size, + Arc::downgrade(&bg_impl) as _, + )?; Ok(Self { idx, @@ -351,15 +359,17 @@ impl PageStore for BlockGroupImpl { BioDirection::FromDevice, ); - let reply_producer = self.page_reads_reply_oqueue().attach_producer()?; - self.page_reads_oqueue().produce(req.handle.idx)?; + let reply_producer = self.page_reads_reply_oqueue().attach_ref_producer()?; + self.page_reads_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); self.fs .upgrade() .unwrap() .read_blocks_async_with_closure(bid, bio_segment, move |_| { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); req.reply_handle.produce(req.handle); })?; @@ -374,22 +384,21 @@ impl PageStore for BlockGroupImpl { .writer() .unwrap() .write_fallible(&mut req.handle.frame.reader().to_fallible())?; - - let reply_producer = self.page_reads_reply_oqueue().attach_producer()?; - self.page_writes_oqueue().produce(req.handle.idx)?; - + let reply_producer = self.page_writes_reply_oqueue().attach_ref_producer()?; + self.page_writes_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); self.fs.upgrade().unwrap().write_blocks_async_with_closure( bid, bio_segment, move |_| { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); if let Some(reply_handle) = req.reply_handle { reply_handle.produce(req.handle); } }, )?; - Ok(()) } diff --git a/kernel/src/fs/ext2/fs.rs b/kernel/src/fs/ext2/fs.rs index e1c2ae7b8..1ab04320b 100644 --- a/kernel/src/fs/ext2/fs.rs +++ b/kernel/src/fs/ext2/fs.rs @@ -3,6 +3,7 @@ #![expect(dead_code)] use aster_block::bio::SubmittedBio; +use ostd::{orpc::path::Path, path}; use super::{ block_group::{BlockGroup, RawGroupDescriptor}, @@ -27,6 +28,7 @@ pub struct Ext2 { block_size: usize, group_descriptors_segment: USegment, self_ref: Weak, + path: Path, } impl Ext2 { @@ -82,6 +84,7 @@ impl Ext2 { Ok(block_groups) }; + let ext2_path = block_device.path().append(&path!(ext2)); let ext2 = Arc::new_cyclic(|weak_ref| Self { inodes_per_group: super_block.inodes_per_group(), blocks_per_group: super_block.blocks_per_group(), @@ -97,6 +100,7 @@ impl Ext2 { super_block: RwMutex::new(Dirty::new(super_block)), group_descriptors_segment, self_ref: weak_ref.clone(), + path: ext2_path, }); Ok(ext2) } @@ -106,6 +110,11 @@ impl Ext2 { self.block_device.as_ref() } + /// Returns the path of this filesystem. + pub fn path(&self) -> &Path { + &self.path + } + /// Returns the size of block. pub fn block_size(&self) -> usize { self.block_size diff --git a/kernel/src/fs/ext2/inode.rs b/kernel/src/fs/ext2/inode.rs index a1a6dd444..64a70eef3 100644 --- a/kernel/src/fs/ext2/inode.rs +++ b/kernel/src/fs/ext2/inode.rs @@ -8,12 +8,13 @@ use core::sync::atomic::{AtomicUsize, Ordering}; use inherit_methods_macro::inherit_methods; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::OQueueRef; +use ostd::orpc::oqueue::OQueueRef; use ostd::{ const_assert, mm::UntypedMem, new_server, - orpc::{orpc_impl, orpc_server}, + orpc::{framework::Server as _, oqueue::OQueue as _, orpc_impl, orpc_server}, + path, util::callback_counter::CallbackCounter, }; @@ -974,6 +975,7 @@ impl InodeInner { Self { page_cache: { let cache = PageCache::with_capacity( + inode_impl.block_manager.path().clone(), num_page_bytes, Arc::downgrade(&inode_impl.block_manager) as _, ) @@ -1218,12 +1220,18 @@ struct InodeImpl { impl InodeImpl { pub fn new(desc: Dirty, weak_self: Weak, fs: Weak) -> Self { - let block_manager = new_server!(|_| InodeBlockManager { - nblocks: AtomicUsize::new(desc.blocks_count() as _), - block_ptrs: RwMutex::new(desc.block_ptrs), - indirect_blocks: RwMutex::new(IndirectBlockCache::new(fs.clone())), - fs, - }); + let block_manager = new_server!( + fs.upgrade() + .unwrap() + .path() + .append(&path!(inode_block_manager[unique])), + |_| InodeBlockManager { + nblocks: AtomicUsize::new(desc.blocks_count() as _), + block_ptrs: RwMutex::new(desc.block_ptrs), + indirect_blocks: RwMutex::new(IndirectBlockCache::new(fs.clone())), + fs, + } + ); Self { desc, block_manager, @@ -2037,22 +2045,26 @@ impl server_traits::PageIOObservable for InodeBlockManager { impl server_traits::PageStore for InodeBlockManager { fn read_page_async(&self, req: server_traits::AsyncReadRequest) -> Result<()> { let bid = req.handle.idx as Ext2Bid; - self.page_reads_oqueue().produce(req.handle.idx)?; - let reply_producer = self.page_reads_reply_oqueue().attach_producer()?; + self.page_reads_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + let reply_producer = self.page_reads_reply_oqueue().attach_ref_producer()?; self.read_block_async_with_closure(bid, &req.handle.frame.clone(), move || { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); req.reply_handle.produce(req.handle); }) } fn write_page_async(&self, req: server_traits::AsyncWriteRequest) -> Result<()> { let bid = req.handle.idx as Ext2Bid; - self.page_writes_oqueue().produce(req.handle.idx)?; - let reply_producer = self.page_writes_reply_oqueue().attach_producer()?; + self.page_writes_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + let reply_producer = self.page_writes_reply_oqueue().attach_ref_producer()?; self.write_block_async_with_closure(bid, &req.handle.frame.clone(), move || { // TODO(arthurp, #120): This can crash if produce blocks. - reply_producer.produce(req.handle.idx); + reply_producer.produce_ref(&req.handle.idx); if let Some(reply_handle) = req.reply_handle { reply_handle.produce(req.handle); } diff --git a/kernel/src/fs/mod.rs b/kernel/src/fs/mod.rs index 185e2b8d8..041858ba2 100644 --- a/kernel/src/fs/mod.rs +++ b/kernel/src/fs/mod.rs @@ -27,6 +27,8 @@ use aster_block::BlockDevice; use aster_raid::selection_policies::RoundRobinPolicy; use aster_raid::{Raid1Device, Raid1DeviceError}; use aster_virtio::device::block::device::BlockDevice as VirtIoBlockDevice; +#[cfg(not(baseline_asterinas))] +use ostd::path; use crate::{ fs::{ext2::Ext2, fs_resolver::FsPath}, @@ -122,7 +124,11 @@ fn setup_raid1_device(raid_device_name: &str) -> Result<()> { #[cfg(not(baseline_asterinas))] info!("[raid] creating selection policy"); #[cfg(not(baseline_asterinas))] - let selection_policy = RoundRobinPolicy::new(members.clone()).unwrap(); + let selection_policy = RoundRobinPolicy::new( + path!(block.raid1.{raid_device_name}.selection_policy), + members.clone(), + ) + .unwrap(); #[cfg(not(baseline_asterinas))] let raid1device = Raid1Device::init(raid_device_name, members, selection_policy); #[cfg(baseline_asterinas)] diff --git a/kernel/src/fs/ramfs/fs.rs b/kernel/src/fs/ramfs/fs.rs index 36757e421..ed3f22548 100644 --- a/kernel/src/fs/ramfs/fs.rs +++ b/kernel/src/fs/ramfs/fs.rs @@ -12,11 +12,12 @@ use aster_rights::Full; use aster_util::slot_vec::SlotVec; use hashbrown::HashMap; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::OQueueRef; +use ostd::orpc::oqueue::OQueueRef; use ostd::{ mm::{UntypedMem, VmIo}, new_server, - orpc::{orpc_impl, orpc_server}, + orpc::{oqueue::OQueue as _, orpc_impl, orpc_server, path::Path}, + path, sync::{PreemptDisabled, RwLockWriteGuard}, }; @@ -54,15 +55,21 @@ pub struct RamFS { root: Arc, /// An inode allocator inode_allocator: AtomicU64, + // The path of this filesystem. + path: Path, } impl RamFS { #[cfg(not(baseline_asterinas))] pub fn new() -> Arc { + use ostd::path; + + let fs_path = path!(ramfs[unique]); Arc::new_cyclic(|weak_fs| Self { sb: SuperBlock::new(RAMFS_MAGIC, BLOCK_SIZE, NAME_MAX), root: RamInode::new_inode_with( - |this| Inner::new_dir(this.clone(), this.clone()), + fs_path.append(&path!(inode[{ ROOT_INO as usize }])), + |_path, this| Inner::new_dir(this.clone(), this.clone()), InodeMeta::new_dir( InodeMode::from_bits_truncate(0o755), Uid::new_root(), @@ -73,6 +80,7 @@ impl RamFS { ROOT_INO, ), inode_allocator: AtomicU64::new(ROOT_INO + 1), + path: fs_path, }) } @@ -158,8 +166,8 @@ impl Inner { Self::Dir(RwLock::new(DirEntry::new(this, parent))) } - pub fn new_file(this: Weak) -> Self { - Self::File(crate::fs::utils::PageCache::new(this).unwrap()) + pub fn new_file(path: Path, this: Weak) -> Self { + Self::File(crate::fs::utils::PageCache::new(path, this).unwrap()) } pub fn new_symlink() -> Self { @@ -421,31 +429,37 @@ impl DirEntry { #[cfg(not(baseline_asterinas))] impl RamInode { fn new_inode_with( - inner: impl FnOnce(&Weak) -> Inner, + path: Path, + inner: impl FnOnce(&Path, &Weak) -> Inner, meta: InodeMeta, fs: Weak, typ: InodeType, ino: u64, ) -> Arc { - new_server!(|weak_self| RamInode { - inner: inner(weak_self), - metadata: SpinLock::new(meta), - ino, - typ, - this: weak_self.clone(), - fs, - extension: Extension::new(), - xattr: RamXattr::new(), - }) + new_server!( + path.clone(), + |weak_self| RamInode { + inner: inner(&path, weak_self), + metadata: SpinLock::new(meta), + ino, + typ, + this: weak_self.clone(), + fs, + extension: Extension::new(), + xattr: RamXattr::new(), + } + ) } fn new_inode( - inner: impl FnOnce(&Weak) -> Inner, + inner: impl FnOnce(&Path, &Weak) -> Inner, meta: InodeMeta, fs: &Arc, typ: InodeType, ) -> Arc { - Self::new_inode_with(inner, meta, Arc::downgrade(fs), typ, fs.alloc_id()) + let ino = fs.alloc_id(); + let path = fs.path.append(&path!(inode[{ ino as usize }])); + Self::new_inode_with(path, inner, meta, Arc::downgrade(fs), typ, ino) } fn new_dir( @@ -456,7 +470,7 @@ impl RamInode { parent: &Weak, ) -> Arc { Self::new_inode( - |this| Inner::new_dir(this.clone(), parent.clone()), + |_path, this| Inner::new_dir(this.clone(), parent.clone()), InodeMeta::new_dir(mode, uid, gid), fs, InodeType::Dir, @@ -465,7 +479,7 @@ impl RamInode { fn new_file(fs: &Arc, mode: InodeMode, uid: Uid, gid: Gid) -> Arc { Self::new_inode( - |this| Inner::new_file(this.clone()), + |path, this| Inner::new_file(path.clone(), this.clone()), InodeMeta::new(mode, uid, gid), fs, InodeType::File, @@ -474,7 +488,7 @@ impl RamInode { fn new_symlink(fs: &Arc, mode: InodeMode, uid: Uid, gid: Gid) -> Arc { Self::new_inode( - |_| Inner::new_symlink(), + |_, _| Inner::new_symlink(), InodeMeta::new(mode, uid, gid), fs, InodeType::SymLink, @@ -490,7 +504,7 @@ impl RamInode { ) -> Arc { let inode_type = InodeType::from(device.type_()); Self::new_inode( - |_| Inner::new_device(device), + |_, _| Inner::new_device(device), InodeMeta::new(mode, uid, gid), fs, inode_type, @@ -499,7 +513,7 @@ impl RamInode { fn new_socket(fs: &Arc, mode: InodeMode, uid: Uid, gid: Gid) -> Arc { Self::new_inode( - |_| Inner::new_socket(), + |_, _| Inner::new_socket(), InodeMeta::new(mode, uid, gid), fs, InodeType::Socket, @@ -508,7 +522,7 @@ impl RamInode { fn new_named_pipe(fs: &Arc, mode: InodeMode, uid: Uid, gid: Gid) -> Arc { Self::new_inode( - |_| Inner::new_named_pipe(), + |_, _| Inner::new_named_pipe(), InodeMeta::new(mode, uid, gid), fs, InodeType::NamedPipe, @@ -652,7 +666,9 @@ impl PageIOObservable for RamInode { #[orpc_impl] impl PageStore for RamInode { fn read_page_async(&self, req: AsyncReadRequest) -> Result<()> { - self.page_reads_oqueue().produce(req.handle.idx)?; + self.page_reads_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); // Initially, any block/page in a RamFs inode contains all zeros req.handle .frame @@ -660,21 +676,25 @@ impl PageStore for RamInode { .to_fallible() .fill_zeros(req.handle.frame.size()) .unwrap(); - self.page_reads_reply_oqueue().produce(req.handle.idx)?; + let reply_producer = self.page_reads_reply_oqueue().attach_ref_producer()?; + reply_producer.produce_ref(&req.handle.idx); req.reply_handle.produce(req.handle); Ok(()) } fn write_page_async(&self, req: AsyncWriteRequest) -> Result<()> { // TODO:OPTIMIZATION: Avoid the clone. - self.page_writes_oqueue().produce(req.handle.idx)?; - self.page_writes_reply_oqueue().produce(req.handle.idx)?; + self.page_writes_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); + self.page_writes_reply_oqueue() + .attach_ref_producer()? + .produce_ref(&req.handle.idx); if let Some(reply_handle) = req.reply_handle { reply_handle.produce(req.handle); } Ok(()) } - fn npages(&self) -> Result { Ok(self.metadata.lock().blocks) } diff --git a/kernel/src/fs/server_traits.rs b/kernel/src/fs/server_traits.rs index 118c5e33f..778722264 100644 --- a/kernel/src/fs/server_traits.rs +++ b/kernel/src/fs/server_traits.rs @@ -1,11 +1,14 @@ // SPDX-License-Identifier: MPL-2.0 -use alloc::{boxed::Box, sync::Arc}; +use alloc::sync::Arc; use core::marker::Copy; -use ostd::orpc::{ - legacy_oqueue::{OQueue as _, OQueueRef, Producer, reply::ReplyQueue}, - orpc_trait, +use ostd::{ + orpc::{ + oqueue::{ConsumableOQueue as _, OQueueRef, ValueProducer, reply::ReplyQueue}, + orpc_trait, + }, + path, }; use crate::{Result, fs::utils::CachePage}; @@ -23,13 +26,13 @@ pub struct PageHandle { pub struct AsyncReadRequest { pub handle: PageHandle, /// A producer handle into an OQueue to send the reply to. - pub reply_handle: Box>, + pub reply_handle: ValueProducer, } pub struct AsyncWriteRequest { pub handle: PageHandle, /// A producer handle into an OQueue to send the reply to. If this is [`None`] no reply is sent. - pub reply_handle: Option>>, + pub reply_handle: Option>, } impl From for AsyncWriteRequest { @@ -41,25 +44,13 @@ impl From for AsyncWriteRequest { } } -// Constructor for a new OQueue for the prefetcher. This is to make testing easier to switch between -// oqueue implementations. -fn new_oqueue() -> OQueueRef { - ostd::orpc::legacy_oqueue::locking::ObservableLockingQueue::new(8, 8) -} - -// Constructor for a new OQueue for the prefetcher which needs a specific length. This is needed for -// cases where a long queue is required to avoid deadlocks. -fn new_oqueue_with_len(len: usize) -> OQueueRef { - ostd::orpc::legacy_oqueue::locking::ObservableLockingQueue::new(len, 8) -} - #[orpc_trait] pub trait PageIOObservable { /// The OQueue containing every read request. This includes both sync and async reads on this /// trait and any other read operations on other traits (for instance, /// [`crate::vm::vmo::Pager::commit_page`]). fn page_reads_oqueue(&self) -> OQueueRef { - new_oqueue() + OQueueRef::new(4, oqueue_path) } /// The OQueue containing every reply for read requests. @@ -67,19 +58,19 @@ pub trait PageIOObservable { // TODO: This must be longer than the largest number of IO that can be outstanding in the // system. Otherwise a produce into this OQueue in the interrupt handler will block panicing // the kernel. - new_oqueue_with_len(32) + OQueueRef::new(64, oqueue_path) } /// The OQueue containing every write request. This includes both sync and async writes and any /// other write operations on other traits fn page_writes_oqueue(&self) -> OQueueRef { - new_oqueue() + OQueueRef::new(4, oqueue_path) } /// The OQueue containing every reply for write requests. fn page_writes_reply_oqueue(&self) -> OQueueRef { // TODO: as page_reads_reply_oqueue - new_oqueue_with_len(32) + OQueueRef::new(64, oqueue_path) } } @@ -110,11 +101,11 @@ pub trait PageStore: PageIOObservable { /// Reads a page synchronously. fn read_page(&self, handle: PageHandle) -> Result<()> { - let reply_oqueue = ReplyQueue::new(2, 0); + let reply_oqueue = ReplyQueue::new_anonymous(2); let consumer = reply_oqueue.attach_consumer()?; self.read_page_async(AsyncReadRequest { handle, - reply_handle: reply_oqueue.attach_producer()?, + reply_handle: reply_oqueue.attach_value_producer()?, })?; consumer.consume(); Ok(()) @@ -122,11 +113,11 @@ pub trait PageStore: PageIOObservable { /// Writes a page synchronously. fn write_page(&self, handle: PageHandle) -> Result<()> { - let reply_oqueue = ReplyQueue::new(2, 0); + let reply_oqueue = ReplyQueue::new_anonymous(2); let consumer = reply_oqueue.attach_consumer()?; self.write_page_async(AsyncWriteRequest { handle, - reply_handle: Some(reply_oqueue.attach_producer()?), + reply_handle: Some(reply_oqueue.attach_value_producer()?), })?; consumer.consume(); Ok(()) @@ -160,15 +151,13 @@ pub struct PageCacheReadInfo { pub trait PageCache { /// Request that the cache prefetch a page. This is asynchronous and advisory, so the page may /// appear in the cache at a later time or never. - fn prefetch_oqueue(&self) -> OQueueRef { - new_oqueue() - } + fn prefetch(&self, page: usize) -> Result<()>; fn underlying_page_store(&self) -> Result>; /// The OQueue containing every reply for write requests. fn page_cache_read_info_oqueue(&self) -> OQueueRef { - new_oqueue_with_len(32) + OQueueRef::new(64, path!(PageCache[unique].page_cache_read_info)) } } diff --git a/kernel/src/fs/utils/page_cache.rs b/kernel/src/fs/utils/page_cache.rs index 0cf46e3d0..aeba666ed 100644 --- a/kernel/src/fs/utils/page_cache.rs +++ b/kernel/src/fs/utils/page_cache.rs @@ -17,11 +17,13 @@ use lru::LruCache; use ostd::{ impl_untyped_frame_meta_for, mm::{Frame, FrameAllocOptions, UFrame, VmIo}, + new_server, orpc::{ - framework::spawn_thread, - legacy_oqueue::{Consumer, OQueueRef, Producer, reply::ReplyQueue}, + oqueue::{Consumer, OQueue, OQueueRef, RefProducer, reply::ReplyQueue}, orpc_impl, orpc_server, + path::Path, }, + path, task::Task, }; use snafu::OptionExt; @@ -97,9 +99,8 @@ fn get_log_hits_misses() -> bool { impl PageCache { /// Creates an empty size page cache associated with a new backend. - #[track_caller] - pub fn new(backend: Weak) -> Result { - let manager = PageCacheManager::spawn(backend, get_prefetch_policy())?; + pub fn new(path: Path, backend: Weak) -> Result { + let manager = PageCacheManager::spawn(path, backend, get_prefetch_policy())?; let pages = VmoOptions::::new(0) .flags(VmoFlags::RESIZABLE) .pager(manager.clone()) @@ -111,9 +112,8 @@ impl PageCache { /// /// The `capacity` is the initial cache size required by the backend. /// This size usually corresponds to the size of the backend. - #[track_caller] - pub fn with_capacity(capacity: usize, backend: Weak) -> Result { - let manager = PageCacheManager::spawn(backend, get_prefetch_policy())?; + pub fn with_capacity(path: Path, capacity: usize, backend: Weak) -> Result { + let manager = PageCacheManager::spawn(path, backend, get_prefetch_policy())?; let pages = VmoOptions::::new(capacity) .flags(VmoFlags::RESIZABLE) .pager(manager.clone()) @@ -348,7 +348,7 @@ impl BuiltinPrefetchPolicy { struct OutstandingRequests { /// Outstanding requests in the form of the consumers that receive the reply. Each one is /// expected to receive exactly one reply. - outstanding: Vec>>, + outstanding: Vec>, } impl Debug for OutstandingRequests { @@ -377,7 +377,7 @@ impl OutstandingRequests { /// Handle any response for a request and return true iff it has been fully processed. fn check_single_request( pages: &mut LruCache>, - c: &mut Box>, + c: &mut Consumer, ) -> bool { if let Some(PageHandle { idx, frame }) = c.try_consume() { Self::store_uptodate(pages, idx, frame); @@ -454,7 +454,7 @@ struct PageCacheManagerInner { pages: LruCache, builtin_prefetch_policy: Option, outstanding_requests: OutstandingRequests, - page_cache_read_info_producer: Option>>, + page_cache_read_info_producer: Option>, } impl PageCacheManagerInner { @@ -511,55 +511,28 @@ impl PageCacheManagerInner { impl PageCacheManager { #[track_caller] - pub fn spawn(backend: Weak, policy: PrefetchPolicy) -> Result> { + pub fn spawn(path: Path, backend: Weak, policy: PrefetchPolicy) -> Result> { let policy = if Task::current().is_none() { PrefetchPolicy::None } else { policy }; - let server = Self::new_with(|orpc_internal, weak_this| Self { - backend, - inner: Mutex::new(PageCacheManagerInner { - // Using a bounded LRU cache would cause data loss because automatic evictions are not caught and written back. - pages: LruCache::unbounded(), - builtin_prefetch_policy: if policy == PrefetchPolicy::Builtin { - Some(BuiltinPrefetchPolicy::new()) - } else { - None - }, - outstanding_requests: Default::default(), - page_cache_read_info_producer: None, - }), - weak_this: weak_this.clone(), - orpc_internal, - }); - - spawn_thread(server.clone(), { - let server = server.clone(); - let prefetch_consumer = server.prefetch_oqueue().attach_consumer()?; - move || { - loop { - ostd::orpc::framework::CurrentServer::abort_point(); - let idx = prefetch_consumer.consume(); - let size = server.backend()?.npages()?; - if idx >= size { - continue; - } - - let mut inner = server.inner.lock(); - let inner = inner.deref_mut(); - - // If the page is not in the cache, issue a request. - if inner.pages.get(&idx).is_none() { - inner.outstanding_requests.request_async( - &mut inner.pages, - &server.backend()?, - idx, - server.as_ref(), - )?; - } - } + let server = new_server!(path, |weak_this| { + Self { + backend, + inner: Mutex::new(PageCacheManagerInner { + // Using a bounded LRU cache would cause data loss because automatic evictions are not caught and written back. + pages: LruCache::unbounded(), + builtin_prefetch_policy: if policy == PrefetchPolicy::Builtin { + Some(BuiltinPrefetchPolicy::new()) + } else { + None + }, + outstanding_requests: Default::default(), + page_cache_read_info_producer: None, + }), + weak_this: weak_this.clone(), } }); @@ -629,7 +602,9 @@ impl PageCacheManager { /// this call. If the built-in prefetch policy is enabled, this will trigger readaheads as /// needed. fn read_page(&self, idx: usize) -> Result { - self.page_reads_oqueue().produce(idx)?; + self.page_reads_oqueue() + .attach_ref_producer()? + .produce_ref(&idx); let frame = { let backend = self.backend()?; @@ -639,7 +614,7 @@ impl PageCacheManager { // Lazily initialize page_cache_read_info_producer if inner.page_cache_read_info_producer.is_none() { inner.page_cache_read_info_producer = - Some(self.page_cache_read_info_oqueue().attach_producer()?); + Some(self.page_cache_read_info_oqueue().attach_ref_producer()?); } let page_cache_read_info_producer = @@ -657,7 +632,7 @@ impl PageCacheManager { if let PageState::Uninit = page.load_state() { // Cond 2: We should wait for the previous readahead. // If there is no previous readahead, an error must have occurred somewhere. - page_cache_read_info_producer.produce(PageCacheReadInfo { + page_cache_read_info_producer.produce_ref(&PageCacheReadInfo { idx, cache_state: CacheState::Pending, }); @@ -668,7 +643,7 @@ impl PageCacheManager { inner.pages.get(&idx).context(UNREACHABLE_SNAFU)?.clone() } else { // Cond 1. - page_cache_read_info_producer.produce(PageCacheReadInfo { + page_cache_read_info_producer.produce_ref(&PageCacheReadInfo { idx, cache_state: CacheState::Hit, }); @@ -676,7 +651,7 @@ impl PageCacheManager { } } else { // Cond 3. - page_cache_read_info_producer.produce(PageCacheReadInfo { + page_cache_read_info_producer.produce_ref(&PageCacheReadInfo { idx, cache_state: CacheState::Miss, }); @@ -703,7 +678,9 @@ impl PageCacheManager { frame }; - self.page_reads_reply_oqueue().produce(idx)?; + self.page_reads_reply_oqueue() + .attach_ref_producer()? + .produce_ref(&idx); Ok(frame.into()) } @@ -724,8 +701,8 @@ impl Debug for PageCacheManager { #[orpc_impl] impl PageIOObservable for PageCacheManager { fn page_reads_oqueue(&self) -> OQueueRef; - fn page_writes_oqueue(&self) -> OQueueRef; fn page_reads_reply_oqueue(&self) -> OQueueRef; + fn page_writes_oqueue(&self) -> OQueueRef; fn page_writes_reply_oqueue(&self) -> OQueueRef; } @@ -742,8 +719,12 @@ impl Pager for PageCacheManager { if let Some(page) = pages.get_mut(&idx) { page.store_state(PageState::Dirty); drop(inner); - self.page_writes_oqueue().produce(idx)?; - self.page_writes_reply_oqueue().produce(idx)?; + self.page_writes_oqueue() + .attach_ref_producer()? + .produce_ref(&idx); + self.page_writes_reply_oqueue() + .attach_ref_producer()? + .produce_ref(&idx); } else { warn!("The page {} is not in page cache", idx); } @@ -785,13 +766,32 @@ impl Pager for PageCacheManager { #[orpc_impl] impl server_traits::PageCache for PageCacheManager { - fn prefetch_oqueue(&self) -> OQueueRef; - fn underlying_page_store(&self) -> Result> { self.backend() } fn page_cache_read_info_oqueue(&self) -> OQueueRef; + + fn prefetch(&self, idx: usize) -> Result<()> { + let size = self.backend()?.npages()?; + if idx >= size { + return Ok(()); + } + + let mut inner = self.inner.lock(); + let inner = inner.deref_mut(); + + // If the page is not in the cache, issue a request. + if inner.pages.get(&idx).is_none() { + inner.outstanding_requests.request_async( + &mut inner.pages, + &self.backend()?, + idx, + self, + )?; + } + Ok(()) + } } /// A page in the page cache. diff --git a/kernel/src/fs/utils/page_cache_logger.rs b/kernel/src/fs/utils/page_cache_logger.rs index f7b857ae2..4c786c6ae 100644 --- a/kernel/src/fs/utils/page_cache_logger.rs +++ b/kernel/src/fs/utils/page_cache_logger.rs @@ -3,12 +3,16 @@ use alloc::sync::Arc; use aster_logger::println; -use ostd::orpc::{ - errors::RPCError, - framework::{shutdown, spawn_thread}, - legacy_oqueue::{OQueueAttachError, OQueueRef}, - orpc_impl, orpc_server, - sync::select, +use ostd::{ + new_server, + orpc::{ + errors::RPCError, + framework::{shutdown, spawn_thread}, + oqueue::{OQueueBase as _, OQueueError, OQueueRef, ObservationQuery}, + orpc_impl, orpc_server, + sync::select, + }, + path, }; use crate::fs::server_traits::PageCacheReadInfo; @@ -30,18 +34,18 @@ impl shutdown::Shutdown for PageCacheLogger { impl PageCacheLogger { pub fn spawn( page_cache_read_info_oqueue: OQueueRef, - ) -> Result, OQueueAttachError> { - let server = Self::new_with(|orpc_internal, _| Self { - orpc_internal, - shutdown_state: Default::default(), + ) -> Result, OQueueError> { + let server = new_server!(path!(page_cache_logger[unique]), |_| Self { + shutdown_state: shutdown::ShutdownState::new(path!(page_cache_logger[unique]),), }); spawn_thread(server.clone(), { - let read_obs = page_cache_read_info_oqueue.attach_strong_observer()?; + let read_obs = + page_cache_read_info_oqueue.attach_strong_observer(ObservationQuery::identity())?; let shutdown_obs = server .shutdown_state .shutdown_oqueue - .attach_strong_observer()?; + .attach_strong_observer(ObservationQuery::unit())?; let server = server.clone(); move || { diff --git a/kernel/src/fs/utils/page_prefetch.rs b/kernel/src/fs/utils/page_prefetch.rs index 3615afe46..bbfa248cc 100644 --- a/kernel/src/fs/utils/page_prefetch.rs +++ b/kernel/src/fs/utils/page_prefetch.rs @@ -8,19 +8,23 @@ // TODO(arthurp, https://github.com/ldos-project/asterinas/issues/118): Replace these policies with // real heuristics. -use alloc::{boxed::Box, sync::Arc}; +use alloc::sync::Arc; use core::ops::Range; -use ostd::orpc::{ - errors::RPCError, - framework::{ - shutdown::{self, ShutdownState}, - spawn_thread, +use ostd::{ + new_server, + orpc::{ + errors::RPCError, + framework::{ + shutdown::{self, ShutdownState}, + spawn_thread, + }, + oqueue::{OQueueBase as _, ObservationQuery, StrongObserver, WeakObserver}, + orpc_impl, orpc_server, + statistics::{Outstanding, OutstandingCounter}, + sync::select, }, - legacy_oqueue::{StrongObserver, WeakObserver}, - orpc_impl, orpc_server, - statistics::{Outstanding, OutstandingCounter}, - sync::select, + path, }; use crate::{ @@ -54,36 +58,45 @@ impl ReadaheadPrefetcher { max_window_size: usize, initial_window_size: usize, ) -> Result> { - let server = Self::new_with(|orpc_internal, _| Self { - orpc_internal, - shutdown_state: Default::default(), + let path = cache.path().append(&path!(readahead_prefetcher)); + let server = new_server!(path.clone(), |_| Self { + shutdown_state: ShutdownState::new(path.clone()), }); let underlying_page_store = cache.underlying_page_store()?; // TODO: Tie this to the lifetime of `server`. let outstanding_counter = OutstandingCounter::spawn( - underlying_page_store.page_reads_oqueue(), - underlying_page_store.page_reads_reply_oqueue(), + path.append(&path!(outstanding_counter)), + underlying_page_store + .page_reads_oqueue() + .attach_strong_observer(ObservationQuery::unit())?, + underlying_page_store + .page_reads_reply_oqueue() + .attach_strong_observer(ObservationQuery::unit())?, )?; spawn_thread(server.clone(), { - let read_observer = cache.page_reads_oqueue().attach_strong_observer()?; - let read_weak_observer = cache.page_reads_oqueue().attach_weak_observer()?; + let read_observer = cache + .page_reads_oqueue() + .attach_strong_observer(ObservationQuery::identity())?; + let read_weak_observer = cache + .page_reads_oqueue() + .attach_weak_observer(2, ObservationQuery::identity())?; let outstanding_count_observer = outstanding_counter .outstanding_oqueue() - .attach_weak_observer()?; + .attach_weak_observer(1, ObservationQuery::identity())?; let shutdown_observer = server .shutdown_state .shutdown_oqueue - .attach_strong_observer()?; + .attach_strong_observer(ObservationQuery::unit())?; let cache: Arc = cache.clone(); let server = server.clone(); struct PrefetcherState { - read_observer: Box>, - read_weak_observer: Box>, - outstanding_count_observer: Box>, - shutdown_observer: Box>, + read_observer: StrongObserver, + read_weak_observer: WeakObserver, + outstanding_count_observer: WeakObserver, + shutdown_observer: StrongObserver<()>, cache: Arc, server: Arc, window: Option>, @@ -130,15 +143,22 @@ impl ReadaheadPrefetcher { if let Some(outstanding) = self .outstanding_count_observer .weak_observe_recent(1) - .last() + .ok() + .and_then(|mut v| v.pop()) + .flatten() { - let is_sequential = if let Some([a, b]) = - self.read_weak_observer.weak_observe_recent(2).last_chunk() - { - a + 1 == *b - } else { - false - }; + let is_sequential = self + .read_weak_observer + .weak_observe_recent(2) + .ok() + .and_then(|v| { + if let [Some(a), Some(b)] = v.as_slice() { + Some(a + 1 == *b) + } else { + None + } + }) + .unwrap_or(false); let has_used_prefetched = if let Some(window) = &self.window { idx == window.start || idx == window.end @@ -147,9 +167,9 @@ impl ReadaheadPrefetcher { }; if is_sequential && has_used_prefetched { self.update_window(idx); - if *outstanding == 0 { + if outstanding == 0 { for i in self.window.as_ref().unwrap().clone() { - self.cache.prefetch_oqueue().produce(i)?; + self.cache.prefetch(i)?; } } } @@ -191,18 +211,21 @@ impl StridedPrefetcher { cache: Arc, n_steps_ahead: usize, ) -> Result> { - let server = Self::new_with(|orpc_internal, _| Self { - orpc_internal, - shutdown_state: Default::default(), + let server = new_server!(cache.path().append(&path!(strided_prefetcher)), |_| Self { + shutdown_state: ShutdownState::new(path!(strided_prefetcher[unique])), }); spawn_thread(server.clone(), { - let read_observer = cache.page_reads_oqueue().attach_strong_observer()?; - let read_weak_observer = cache.page_reads_oqueue().attach_weak_observer()?; + let read_observer = cache + .page_reads_oqueue() + .attach_strong_observer(ObservationQuery::identity())?; + let read_weak_observer = cache + .page_reads_oqueue() + .attach_weak_observer(2, ObservationQuery::identity())?; let shutdown_observer = server .shutdown_state .shutdown_oqueue - .attach_strong_observer()?; + .attach_strong_observer(ObservationQuery::unit())?; let cache: Arc = cache.clone(); let server = server.clone(); @@ -211,13 +234,12 @@ impl StridedPrefetcher { server.shutdown_state.check()?; select!( if let idx = read_observer.try_strong_observe() { - let recent = read_weak_observer.recent_cursor(); - let history = read_weak_observer.weak_observe_range(recent - 1, recent); - if history.len() >= 2 { - let stride = history[1] - history[0]; - cache - .prefetch_oqueue() - .produce(idx + stride * n_steps_ahead)?; + let recent = read_weak_observer.newest_cursor(); + let prev = read_weak_observer.weak_observe(recent - 1).unwrap_or(None); + let last = read_weak_observer.weak_observe(recent).unwrap_or(None); + if let (Some(a), Some(b)) = (prev, last) { + let stride = b - a; + cache.prefetch(idx + stride * n_steps_ahead)?; } }, if let () = shutdown_observer.try_strong_observe() {} diff --git a/kernel/src/util/timer.rs b/kernel/src/util/timer.rs index ae0ecbdaa..6327eb35d 100644 --- a/kernel/src/util/timer.rs +++ b/kernel/src/util/timer.rs @@ -12,9 +12,10 @@ use ostd::{ notifier::{Notifier, NotifierOQueues}, spawn_thread, }, - legacy_oqueue::OQueueRef, + oqueue::{OQueue as _, OQueueRef}, orpc_impl, orpc_server, }, + path, sync::WaitQueue, }; use snafu::Whatever; @@ -30,11 +31,11 @@ pub struct TimerServer { #[orpc_impl] impl Notifier for TimerServer { fn notify(&self) -> Result<(), RPCError> { - if self.notification_oqueue().produce(()).is_err() { - panic!("Could not produce into notification_oqueue") - } else { - Ok(()) - } + self.notification_oqueue() + .attach_ref_producer() + .expect("Could not attach producer to notification_oqueue") + .produce_ref(&()); + Ok(()) } fn notification_oqueue(&self) -> OQueueRef<()>; @@ -53,7 +54,7 @@ impl TimerServer { /// Create a TimerServer with the specified frequency. pub fn new(freq: Duration) -> Result, Whatever> { - let server = new_server!(|_| Self { freq }); + let server = new_server!(path!(timer[unique]), |_| Self { freq }); Ok(server) } diff --git a/kernel/src/vm/hugepaged.rs b/kernel/src/vm/hugepaged.rs index d23d7a751..46f2529ac 100644 --- a/kernel/src/vm/hugepaged.rs +++ b/kernel/src/vm/hugepaged.rs @@ -3,11 +3,15 @@ use alloc::{boxed::Box, sync::Arc, vec::Vec}; use core::time::Duration; -use ostd::orpc::{ - framework::{notifier::Notifier, spawn_thread}, - legacy_oqueue::OQueue, - orpc_server, orpc_trait, - sync::select, +use ostd::{ + new_server, + orpc::{ + framework::{notifier::Notifier, spawn_thread}, + oqueue::{OQueueBase as _, ObservationQuery}, + orpc_server, orpc_trait, + sync::select, + }, + path, }; use snafu::Whatever; @@ -32,7 +36,7 @@ impl HugepagedServer { hugepaged } pub fn new() -> Result, Whatever> { - let server = Self::new_with(|orpc_internal, _| Self { orpc_internal }); + let server = new_server!(path!(hugepaged[unique]), |_| Self {}); Ok(server) } @@ -40,10 +44,11 @@ impl HugepagedServer { let notify_server = TimerServer::spawn(Duration::from_secs(1)); let pagefault_oq = vmar::oqueues::get_page_fault_oqueue(); - let pagefault_observer = pagefault_oq.attach_strong_observer()?; + let pagefault_observer = + pagefault_oq.attach_strong_observer(ObservationQuery::identity())?; let notify_observer = notify_server .notification_oqueue() - .attach_strong_observer()?; + .attach_strong_observer(ObservationQuery::unit())?; loop { let mut value: Option = None; loop { diff --git a/kernel/src/vm/vmar/mod.rs b/kernel/src/vm/vmar/mod.rs index 4c9d48210..8ad85dd31 100644 --- a/kernel/src/vm/vmar/mod.rs +++ b/kernel/src/vm/vmar/mod.rs @@ -20,7 +20,9 @@ use osdk_heap_allocator::{CpuLocalBox, alloc_cpu_local}; #[cfg(not(baseline_asterinas))] use ostd::mm::vm_space::VmMappingPolicyOQueues; #[cfg(not(baseline_asterinas))] -use ostd::orpc::legacy_oqueue::{OQueue as _, OQueueRef, ringbuffer::MPMCOQueue}; +use ostd::orpc::oqueue::{OQueue as _, OQueueRef, RefProducer}; +#[cfg(not(baseline_asterinas))] +use ostd::path; use ostd::{ cpu::{CpuId, all_cpus}, mm::{ @@ -200,10 +202,9 @@ pub struct PageFaultOQueueMessage { #[cfg(not(baseline_asterinas))] pub mod oqueues { - use alloc::sync::Arc; use core::{sync::atomic::AtomicUsize, time::Duration}; - use ostd::orpc::legacy_oqueue::ringbuffer::MPMCOQueue; + use ostd::orpc::oqueue::OQueueRef; use spin::Once; use super::PageFaultOQueueMessage; @@ -225,19 +226,18 @@ pub mod oqueues { } } - pub(super) static PAGE_FAULT_OQUEUE: Once< - Arc>>, - > = Once::new(); + pub(super) static PAGE_FAULT_OQUEUE: Once>> = + Once::new(); pub static GLOBAL_RSS: AtomicUsize = AtomicUsize::new(0); - pub(super) static RSS_DELTA_OQUEUE: Once>>> = Once::new(); + pub(super) static RSS_DELTA_OQUEUE: Once>> = Once::new(); - pub fn get_rss_delta_oqueue() -> Arc>> { + pub fn get_rss_delta_oqueue() -> OQueueRef> { RSS_DELTA_OQUEUE.wait().clone() } - pub fn get_page_fault_oqueue() -> Arc>> { + pub fn get_page_fault_oqueue() -> OQueueRef> { PAGE_FAULT_OQUEUE.wait().clone() } } @@ -245,9 +245,8 @@ pub mod oqueues { pub fn init() { #[cfg(not(baseline_asterinas))] { - // Only support a single strong observer for now - hugepaged. - oqueues::PAGE_FAULT_OQUEUE.call_once(|| MPMCOQueue::new(64, 1)); - oqueues::RSS_DELTA_OQUEUE.call_once(|| MPMCOQueue::new(64, 1)); + oqueues::PAGE_FAULT_OQUEUE.call_once(|| OQueueRef::new(64, path!(vmar.page_fault))); + oqueues::RSS_DELTA_OQUEUE.call_once(|| OQueueRef::new(64, path!(vmar.rss_delta))); } } @@ -266,7 +265,7 @@ pub(super) struct Vmar_ { /// OQueue Producer to notify policies about page fault events #[cfg(not(baseline_asterinas))] - page_fault_oqueue_producer: OQueueRef>, + page_fault_oqueue_producer: RefProducer>, } struct VmarInner { @@ -571,7 +570,10 @@ impl Vmar_ { rss_counters, rss_hwm_counters, #[cfg(not(baseline_asterinas))] - page_fault_oqueue_producer: oqueues::PAGE_FAULT_OQUEUE.wait().clone(), + page_fault_oqueue_producer: oqueues::PAGE_FAULT_OQUEUE + .wait() + .attach_ref_producer() + .expect("Failed to attach page fault oqueue producer"), }) } @@ -579,8 +581,10 @@ impl Vmar_ { let vmar_inner = VmarInner::new(); let mut vm_space = VmSpace::new(); if huge_mapping_enabled() { - vm_space = - vm_space.with_mapping_policy(new_server!(|_| VmMappingPolicyGreedyHugeMapping {})); + vm_space = vm_space + .with_mapping_policy(new_server!(path!(vmar.root.mapping_policy[unique]), |_| { + VmMappingPolicyGreedyHugeMapping {} + })); } Vmar_::new( vmar_inner, @@ -661,10 +665,10 @@ impl Vmar_ { #[cfg(not(baseline_asterinas))] if res.is_ok() { self.page_fault_oqueue_producer - .produce(oqueues::ObservableEvent::new(PageFaultOQueueMessage { + .produce_ref(&oqueues::ObservableEvent::new(PageFaultOQueueMessage { vm_space_id: self.vm_space.id(), fault_info: *page_fault_info, - }))?; + })); } return res; } @@ -850,8 +854,10 @@ impl Vmar_ { let vmar_inner = VmarInner::new(); let mut new_space = VmSpace::new(); if huge_mapping_enabled() { - new_space = new_space - .with_mapping_policy(new_server!(|_| VmMappingPolicyGreedyHugeMapping {})) + new_space = new_space.with_mapping_policy(new_server!( + path!(vmar.fork.mapping_policy[unique]), + |_| VmMappingPolicyGreedyHugeMapping {} + )) } Vmar_::new( vmar_inner, @@ -1351,9 +1357,11 @@ impl<'a> RssDelta<'a> { } else { oqueues::GLOBAL_RSS.fetch_sub(-increment as usize, Ordering::Relaxed); } - let _ = oqueues::RSS_DELTA_OQUEUE + oqueues::RSS_DELTA_OQUEUE .wait() - .produce(oqueues::ObservableEvent::new(increment)); + .attach_ref_producer() + .expect("Failed to attach RSS delta oqueue producer") + .try_produce_ref(&oqueues::ObservableEvent::new(increment)); } self.delta[rss_type as usize] += increment; diff --git a/ostd/libs/orpc-macros/src/lib.rs b/ostd/libs/orpc-macros/src/lib.rs index 2ab83597e..f5a89411d 100644 --- a/ostd/libs/orpc-macros/src/lib.rs +++ b/ostd/libs/orpc-macros/src/lib.rs @@ -9,9 +9,10 @@ mod parsing_utils; mod select; use proc_macro::TokenStream; +use quote::quote; use syn::{ - ItemImpl, ItemStruct, ItemTrait, Path, Token, Visibility, parse_macro_input, - punctuated::Punctuated, + Expr, ExprMethodCall, ItemImpl, ItemStruct, ItemTrait, Path, Token, Visibility, + parse_macro_input, punctuated::Punctuated, }; /// Declare a trait as an ORPC trait that can be implemented by ORPC server. @@ -197,7 +198,41 @@ pub fn orpc_monitor(arg: TokenStream, input: TokenStream) -> TokenStream { /// For example: /// /// ```ignore -/// orpc_macros::select!( +/// orpc_macros::select_legacy!( +/// if let msg = receiver1.try_produce() { +/// assert_eq!(msg.x, receiver1_counter); +/// receiver1_counter += 1; +/// }, +/// if let TestMessage { x } = receiver2.try_produce() { +/// assert_eq!(x, receiver2_counter); +/// receiver2_counter += 1; +/// } +/// ) +/// ``` +/// +/// NOTE: Keep the code inside the macro short and call into separate functions if possible. Inside macros IDE +/// assistance does not always work correctly and those tools are worth having for as much code as possible. +#[proc_macro] +pub fn select_legacy(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as select::SelectInput); + let output = select::select_macro_impl( + input, + |e| quote! { #e }, + |b| quote! { ::core::convert::AsRef::as_ref(&#b) }, + ); + output.into() +} + +/// Wait for one of multiple conditions to become true and execute the appropriate block. +/// +/// The syntax is a series of if-let statements (separated by commas for technical reasons) where the RHS of each +/// binding is in the form `[blocker].fields_and_methods`. `select` will block waiting for any of the blockers and then +/// run all the if statements. The pattern must be irrefutable. +/// +/// For example: +/// +/// ```ignore +/// orpc_macros::select_legacy!( /// if let msg = receiver1.try_produce() { /// assert_eq!(msg.x, receiver1_counter); /// receiver1_counter += 1; @@ -214,7 +249,19 @@ pub fn orpc_monitor(arg: TokenStream, input: TokenStream) -> TokenStream { #[proc_macro] pub fn select(input: TokenStream) -> TokenStream { let input = parse_macro_input!(input as select::SelectInput); - let output = select::select_macro_impl(input); + let output = select::select_macro_impl( + input, + |e| { + match e { + // Special case consume since it cannot return an error. + Expr::MethodCall(ExprMethodCall { method, .. }) if method == "try_consume" => { + quote! { #e } + } + _ => quote! { #e ? }, + } + }, + |b| quote! { &#b }, + ); output.into() } diff --git a/ostd/libs/orpc-macros/src/orpc_server.rs b/ostd/libs/orpc-macros/src/orpc_server.rs index 91368816e..c4baf1186 100644 --- a/ostd/libs/orpc-macros/src/orpc_server.rs +++ b/ostd/libs/orpc-macros/src/orpc_server.rs @@ -2,7 +2,7 @@ use quote::{ToTokens, format_ident, quote, quote_spanned}; use syn::{FieldsNamed, ItemStruct, LitStr, Path, Token, spanned::Spanned}; -use crate::parsing_utils::{generics_to_phantom, make_oqueues_field_name}; +use crate::parsing_utils::{generics_to_phantom, make_oqueues_field_name, path_to_turbofish}; /// The implementations of the `orpc_server` attr macro. pub fn orpc_server_macro_impl( @@ -29,28 +29,22 @@ pub fn orpc_server_macro_impl( .iter() .map(|trait_ident| { let oqueue_field_ident = make_oqueues_field_name(&mut errors, trait_ident); - let trait_ident = { - let mut trait_ident = trait_ident.clone(); - if let Some(last_type_segment) = trait_ident.segments.last_mut() { - last_type_segment.ident = format_ident!( - "{}OQueues", - last_type_segment.ident, - span = last_type_segment.ident.span() - ); - } - trait_ident - }; + let struct_ident = make_oqueues_struct_path(trait_ident); quote! { #[allow(unused)] - #oqueue_field_ident: #trait_ident + #oqueue_field_ident: #struct_ident } }) .collect(); // The names of the OQueue fields. - let oqueue_field_names: Vec<_> = traits + let oqueue_field_initializers: Vec<_> = traits .iter() - .map(|i| make_oqueues_field_name(&mut errors, i)) + .map(|i| { + let field_name = make_oqueues_field_name(&mut errors, i); + let struct_name = make_oqueues_struct_path(i); + quote! { #field_name: #struct_name::new(path.clone()), } + }) .collect(); let orpc_internal_struct_doc = LitStr::new( @@ -59,6 +53,16 @@ pub fn orpc_server_macro_impl( ); let orpc_internal_struct_ident = format_ident!("{}ORPCInternal", ident.to_string()); + + // The initializer for the ORPC internals struct for this server type + let internal_init = quote! { + #orpc_internal_struct_ident { + #(#oqueue_field_initializers)* + base: ::ostd::orpc::framework::ServerBase::new(path, weak_this.clone()), + _phantom: ::core::marker::PhantomData, + } + }; + // All the fields of the user declared server struct with the added orpc_internal field. let fields = match fields { syn::Fields::Named(FieldsNamed { named, .. }) => { @@ -80,15 +84,6 @@ pub fn orpc_server_macro_impl( } }; - // The initializer for the ORPC internals struct for this server type - let internal_init = quote! { - #orpc_internal_struct_ident { - base: ::ostd::orpc::framework::ServerBase::new(weak_this.clone()), - #(#oqueue_field_names: ::core::default::Default::default(),)* - _phantom: ::core::marker::PhantomData, - } - }; - // The implementation of `new_with` for this server. let new_with_impl = quote! { impl #impl_generics #ident #type_generics #where_clause { @@ -106,10 +101,11 @@ let server = Self::new_with(|orpc_internal, weak_this| Self { "] #[track_caller] #vis fn new_with( + path: ::ostd::orpc::path::Path, f: impl ::core::ops::FnOnce(#orpc_internal_struct_ident #type_generics, &::alloc::sync::Weak) -> Self, ) -> ::alloc::sync::Arc:: { let server = ::alloc::sync::Arc::::new_cyclic( - |weak_this| { + move |weak_this| { let orpc_internal = #internal_init; f(orpc_internal, weak_this) }); @@ -153,3 +149,15 @@ let server = Self::new_with(|orpc_internal, weak_this| Self { output } + +fn make_oqueues_struct_path(trait_ident: &Path) -> Path { + let mut trait_ident = trait_ident.clone(); + if let Some(last_type_segment) = trait_ident.segments.last_mut() { + last_type_segment.ident = format_ident!( + "{}OQueues", + last_type_segment.ident, + span = last_type_segment.ident.span() + ); + } + path_to_turbofish(trait_ident) +} diff --git a/ostd/libs/orpc-macros/src/orpc_trait.rs b/ostd/libs/orpc-macros/src/orpc_trait.rs index 950fe7871..9db907c3a 100644 --- a/ostd/libs/orpc-macros/src/orpc_trait.rs +++ b/ostd/libs/orpc-macros/src/orpc_trait.rs @@ -116,8 +116,8 @@ pub fn orpc_trait_macro_impl( _phantom: #phantom } - impl #impl_generics ::core::default::Default for #oqueues_struct_ident #type_generics #where_clause { - fn default() -> Self { + impl #impl_generics #oqueues_struct_ident #type_generics #where_clause { + #vis fn new(server_path: ::ostd::orpc::path::Path) -> Self { Self { #(#oqueue_initializers,)* _phantom: ::core::marker::PhantomData, @@ -226,6 +226,12 @@ fn process_oqueue_method( .collect(); // Create the initializer for the field. In the error case, just use `todo!` and generate an error separately. if let Some(constr) = &trait_item_method.default { + let constr = quote! { + { + let oqueue_path = server_path.append(&::ostd::path!(#ident)); + #constr + } + }; oqueue_initializers.push(quote! { #ident: #constr }); diff --git a/ostd/libs/orpc-macros/src/parsing_utils.rs b/ostd/libs/orpc-macros/src/parsing_utils.rs index f0d6b83d6..0f1d38520 100644 --- a/ostd/libs/orpc-macros/src/parsing_utils.rs +++ b/ostd/libs/orpc-macros/src/parsing_utils.rs @@ -21,7 +21,9 @@ impl ORPCMethodKind<'_> { let name = path_segment.ident.to_string(); return match name.as_str() { "Result" => Some(ORPCMethodKind::Orpc { return_type: typ }), - "OQueueRef" => Some(ORPCMethodKind::OQueue { return_type: typ }), + "OQueueRef" | "ConsumableOQueueRef" => { + Some(ORPCMethodKind::OQueue { return_type: typ }) + } _ => None, }; } @@ -84,3 +86,13 @@ pub(crate) fn generics_to_phantom(generics: &Generics) -> Type { parse_quote!(::core::marker::PhantomData (#types)>) } + +/// Converts a path into turbofish form, i.e. `Foo` becomes `Foo::`. +pub(crate) fn path_to_turbofish(mut path: Path) -> Path { + for segment in path.segments.iter_mut() { + if let syn::PathArguments::AngleBracketed(ref mut args) = segment.arguments { + args.colon2_token = Some(Default::default()); + } + } + path +} diff --git a/ostd/libs/orpc-macros/src/select.rs b/ostd/libs/orpc-macros/src/select.rs index 7f2a271a1..dbc097cc4 100644 --- a/ostd/libs/orpc-macros/src/select.rs +++ b/ostd/libs/orpc-macros/src/select.rs @@ -1,8 +1,8 @@ // SPDX-License-Identifier: MPL-2.0 -/// The implementation of the `select!` macro. +/// The implementation of the `select_legacy!` macro. /// /// TODO(#73): This syntax is probably bad and will be replaced. -use proc_macro2::Span; +use proc_macro2::{Span, TokenStream}; use quote::quote; use syn::{Block, Expr, ExprLet, Ident, Token, parse::Parse, punctuated::Punctuated, token::Comma}; @@ -55,19 +55,23 @@ impl Parse for SelectInput { } } -/// The implementations of the `select!` macro. -pub fn select_macro_impl(input: SelectInput) -> proc_macro2::TokenStream { +/// The implementations of the `select_legacy!` macro. +pub fn select_macro_impl( + input: SelectInput, + wrap_expr: impl Fn(&Expr) -> TokenStream, + wrap_blocker: impl Fn(&Expr) -> TokenStream, +) -> TokenStream { let blockers: Vec<_> = input .clauses .iter() - .map(|clause| clause.blocker()) + .map(|clause| wrap_blocker(clause.blocker())) .collect(); // Generate all the check statements which run each time a blocker wakes. let check_statements = input.clauses.iter().map(|clause| { let attrs = &clause.let_binding.attrs; let pat = &clause.let_binding.pat; - let blocker_expr = &clause.let_binding.expr; + let blocker_expr = wrap_expr(&clause.let_binding.expr); let body = &clause.body; let tmp = Ident::new("message", Span::mixed_site()); quote! { @@ -82,7 +86,7 @@ pub fn select_macro_impl(input: SelectInput) -> proc_macro2::TokenStream { let output = quote! { { - ::ostd::task::Task::current().map(|c| c.block_on(&[#(::core::convert::AsRef::as_ref(&#blockers)),*])); + ::ostd::task::Task::current().map(|c| c.block_on(&[#(#blockers),*])); #(#check_statements)* } }; diff --git a/ostd/src/mm/vm_space.rs b/ostd/src/mm/vm_space.rs index 50d02e6bc..d3c674373 100644 --- a/ostd/src/mm/vm_space.rs +++ b/ostd/src/mm/vm_space.rs @@ -28,6 +28,8 @@ use crate::{ prelude::*, task::{DisabledPreemptGuard, atomic_mode::AsAtomicModeGuard, disable_preempt}, }; +#[cfg(not(baseline_asterinas))] +use crate::{new_server, path}; /// Request for [`VmMappingPolicy`]. pub struct VmMappingRequest { @@ -120,8 +122,8 @@ impl VmSpace { cpus: AtomicCpuSet::new(CpuSet::new_empty()), // Set the default policy to be base pages only. This can updated by calling // with_mapping_policy. - vm_mapping_policy: VmMappingPolicyBasePagesOnly::new_with(|orpc_internal, _| { - VmMappingPolicyBasePagesOnly { orpc_internal } + vm_mapping_policy: new_server!(path!(vmspace[unique]), |_| { + VmMappingPolicyBasePagesOnly {} }), }; #[cfg(baseline_asterinas)] diff --git a/ostd/src/orpc/framework/integration_test.rs b/ostd/src/orpc/framework/integration_test.rs index 36b0c14b3..e6b5a96ad 100644 --- a/ostd/src/orpc/framework/integration_test.rs +++ b/ostd/src/orpc/framework/integration_test.rs @@ -18,6 +18,8 @@ mod test { use crate::{ assert_eq_eventually, assert_eventually, assert_matches_eventually, + assertion::sleep, + new_server, orpc::{ errors::RPCError, framework::{ @@ -25,12 +27,14 @@ mod test { shutdown::{Shutdown, ShutdownState}, spawn_thread, }, - legacy_oqueue::{ - Consumer, OQueueRef, generic_test, - locking::{LockingQueue, ObservableLockingQueue}, + oqueue::{ + ConsumableOQueue, ConsumableOQueueRef, Consumer, OQueue, OQueueBase, OQueueRef, + ObservationQuery, }, + path::Path, }, - prelude::{Arc, Box}, + path, + prelude::Arc, }; struct AdditionalAmount { @@ -41,8 +45,8 @@ mod test { #[orpc_trait] trait Counter { fn atomic_incr(&self, additional: AdditionalAmount) -> Result; - fn incr_oqueue(&self) -> OQueueRef { - LockingQueue::new(8) + fn incr_oqueue(&self) -> ConsumableOQueueRef { + ConsumableOQueueRef::new(8, oqueue_path) } } @@ -64,13 +68,13 @@ mod test { Ok(v + addend) } - fn incr_oqueue(&self) -> OQueueRef; + fn incr_oqueue(&self) -> ConsumableOQueueRef; } impl ServerAState { fn main_thread( &self, - incr_oqueue_consumer: Box>, + incr_oqueue_consumer: Consumer, ) -> Result<(), Whatever> { let mut _count = 0; loop { @@ -88,7 +92,7 @@ mod test { increment: usize, atomic_count: AtomicUsize, ) -> Result, Whatever> { - let server = Self::new_with(|orpc_internal, _| Self { + let server = Self::new_with(Path::test(), |orpc_internal, _| Self { increment, atomic_count, orpc_internal, @@ -165,7 +169,7 @@ mod test { server_ref .incr_oqueue() - .attach_producer() + .attach_value_producer() .unwrap() .produce(AdditionalAmount { n: 1, @@ -176,7 +180,7 @@ mod test { server_ref .incr_oqueue() - .attach_producer() + .attach_value_producer() .unwrap() .produce(AdditionalAmount { n: 1, @@ -185,7 +189,7 @@ mod test { server_ref .incr_oqueue() - .attach_producer() + .attach_value_producer() .unwrap() .produce(AdditionalAmount { n: 1, @@ -194,7 +198,7 @@ mod test { // This is fundamentally racy, but it's very hard to avoid because any reply from the message send above will, by // definition, be sent before the panic. - generic_test::sleep(Duration::from_millis(250)); + sleep(Duration::from_millis(250)); assert_matches!( server_ref.atomic_incr(AdditionalAmount { @@ -222,7 +226,8 @@ mod test { impl TestServer { fn spawn() -> Result, Whatever> { - let server = Self::new_with(|orpc_internal, _| Self { orpc_internal }); + let server = + Self::new_with(Path::test(), |orpc_internal, _| Self { orpc_internal }); Ok(server) } } @@ -259,7 +264,7 @@ mod test { impl TestServer { fn spawn() -> Result, Whatever> { - let server = Self::new_with(|orpc_internal, _| Self { + let server = Self::new_with(Path::test(), |orpc_internal, _| Self { x: Default::default(), orpc_internal, }); @@ -305,12 +310,10 @@ mod test { impl SimpleServer { fn spawn() -> Result, Whatever> { let thread_exited = Arc::new(AtomicBool::new(false)); - let server = Self::new_with(|orpc_internal, _| Self { - shutdown_state: ShutdownState::default(), + let server = new_server!(Path::test(), |_| Self { + shutdown_state: ShutdownState::new(Path::test()), thread_exited: thread_exited.clone(), - orpc_internal, }); - spawn_thread(server.clone(), { let server = server.clone(); let thread_exited = thread_exited.clone(); @@ -403,13 +406,11 @@ mod test { } impl MessageCounterServer { - fn spawn_with_queue(queue: OQueueRef) -> Result, Whatever> { - let server = Self::new_with(|orpc_internal, _| Self { + fn spawn_with_queue(queue: ConsumableOQueueRef) -> Result, Whatever> { + let server = new_server!(Path::test(), |_| Self { processed_count: AtomicUsize::new(0), - shutdown_state: ShutdownState::default(), - orpc_internal, + shutdown_state: ShutdownState::new(path!(server_type_name)), }); - spawn_thread(server.clone(), { let server = server.clone(); let consumer = queue @@ -418,7 +419,7 @@ mod test { let shutdown_consumer = server .shutdown_state .shutdown_oqueue - .attach_consumer() + .attach_strong_observer(ObservationQuery::unit()) .whatever_context("attach shutdown")?; move || { loop { @@ -427,7 +428,7 @@ mod test { if let _ = consumer.try_consume() { server.processed_count.fetch_add(1, Ordering::Relaxed); }, - if let _ = shutdown_consumer.try_consume() {} + if let _ = shutdown_consumer.try_strong_observe() {} ); crate::task::Task::yield_now(); } @@ -446,10 +447,10 @@ mod test { SERVER_DROPS.store(0, Ordering::SeqCst); - let queue: OQueueRef = LockingQueue::new(8); + let queue = ConsumableOQueueRef::new(8, path!(queue[unique])); let server1 = MessageCounterServer::spawn_with_queue(queue.clone()).unwrap(); - queue.produce(42).unwrap(); + queue.attach_value_producer().unwrap().produce(42); assert_eq_eventually!(server1.get_processed_count().unwrap(), 1); let _ = server1.shutdown(); @@ -464,7 +465,7 @@ mod test { let server2 = MessageCounterServer::spawn_with_queue(queue.clone()).unwrap(); assert_eq!(server2.get_processed_count().unwrap(), 0); - queue.produce(42).unwrap(); + queue.attach_value_producer().unwrap().produce(42); assert_eq_eventually!(server2.get_processed_count().unwrap(), 1); let _ = server2.shutdown(); @@ -503,21 +504,20 @@ mod test { impl MessageCounterServer { fn spawn_with_queue(queue: OQueueRef) -> Result, Whatever> { - let server = Self::new_with(|orpc_internal, _| Self { + let server = new_server!(Path::test(), |_| Self { processed_count: AtomicUsize::new(0), - shutdown_state: ShutdownState::default(), - orpc_internal, + shutdown_state: ShutdownState::new(path!(message_counter_server)), }); spawn_thread(server.clone(), { let server = server.clone(); let observer = queue - .attach_strong_observer() + .attach_strong_observer(ObservationQuery::identity()) .whatever_context("attach strong observer")?; let shutdown_consumer = server .shutdown_state .shutdown_oqueue - .attach_consumer() + .attach_strong_observer(ObservationQuery::unit()) .whatever_context("attach shutdown")?; move || { loop { @@ -526,7 +526,7 @@ mod test { if let _ = observer.try_strong_observe() { server.processed_count.fetch_add(1, Ordering::SeqCst); }, - if let _ = shutdown_consumer.try_consume() {} + if let _ = shutdown_consumer.try_strong_observe() {} ); } } @@ -544,10 +544,10 @@ mod test { SERVER_DROPS.store(0, Ordering::SeqCst); - let queue: OQueueRef = ObservableLockingQueue::new(8, 1); + let queue: OQueueRef = OQueueRef::new(8, path!(queue[unique])); let server1 = MessageCounterServer::spawn_with_queue(queue.clone()).unwrap(); - queue.produce(42).unwrap(); + queue.attach_ref_producer().unwrap().produce_ref(&42); assert_eq_eventually!(server1.get_processed_count().unwrap(), 1); let _ = server1.shutdown(); @@ -562,7 +562,7 @@ mod test { let server2 = MessageCounterServer::spawn_with_queue(queue.clone()).unwrap(); assert_eq!(server2.get_processed_count().unwrap(), 0); - queue.produce(42).unwrap(); + queue.attach_ref_producer().unwrap().produce_ref(&42); assert_eq_eventually!(server2.get_processed_count().unwrap(), 1); let _ = server2.shutdown(); diff --git a/ostd/src/orpc/framework/mod.rs b/ostd/src/orpc/framework/mod.rs index 0d031a9c2..7b65a74da 100644 --- a/ostd/src/orpc/framework/mod.rs +++ b/ostd/src/orpc/framework/mod.rs @@ -40,7 +40,10 @@ pub use threads::spawn_thread; use crate::{ cpu_local_cell, - orpc::errors::{RPCError, ServerMissingSnafu}, + orpc::{ + errors::{RPCError, ServerMissingSnafu}, + path::Path, + }, prelude::Arc, sync::Mutex, task::{Task, TaskOptions, disable_preempt, scheduler}, @@ -55,6 +58,11 @@ pub trait Server: Any + Sync + Send + 'static { /// class pointer of this server. #[doc(hidden)] fn orpc_server_base(&self) -> &ServerBase; + + /// Get the path of this server. + fn path(&self) -> &Path { + self.orpc_server_base().path() + } } static NEXT_SERVER_ID: AtomicUsize = AtomicUsize::new(1); @@ -74,6 +82,8 @@ pub struct ServerBase { /// An opaque ID for the server. This is non-zero to allow compact representations of /// `Option` in errors. id: NonZeroUsize, + /// The path of the server. + path: Path, } impl ServerBase { @@ -82,12 +92,13 @@ impl ServerBase { /// /// Create a new `ServerBase` with a cyclical reference to the server containing it. #[doc(hidden)] - pub fn new(weak_this: Weak) -> Self { + pub fn new(path: Path, weak_this: Weak) -> Self { Self { aborted: Default::default(), server_threads: Mutex::new(Default::default()), weak_this, id: NonZeroUsize::new(NEXT_SERVER_ID.fetch_add(1, Ordering::Relaxed)).unwrap(), + path, } } @@ -166,6 +177,11 @@ impl ServerBase { pub fn id(&self) -> NonZeroUsize { self.id } + + /// Get the path of the server assigned when it was created. + pub fn path(&self) -> &Path { + &self.path + } } /// Methods to access the current server. @@ -174,20 +190,30 @@ pub struct CurrentServer { } impl CurrentServer { + /// Call `f` with a reference to the current server. + pub fn with_server(f: impl Fn(Option<&Arc>) -> T) -> T { + f(Task::current().unwrap().server().borrow().as_ref()) + } + /// Get a new Arc reference to the current server. pub fn current_cloned() -> Option> { - Task::current().unwrap().server().borrow().clone() + Self::with_server(|s| s.cloned()) } /// Check if the current server has aborted pub fn is_aborted() -> bool { - Task::current() - .unwrap() - .server() - .borrow() - .as_ref() - .map(|s| s.orpc_server_base().is_aborted()) - .unwrap_or(false) + Self::with_server(|s| { + s.map(|s| s.orpc_server_base().is_aborted()) + .unwrap_or(false) + }) + } + + /// Get the path of the current server. + pub fn path() -> Path { + Self::with_server(|s| { + s.map(|s| s.orpc_server_base().path().clone()) + .unwrap_or_default() + }) } /// Check the if the current server has aborted and panic if it has. This should be called periodically from all @@ -358,7 +384,7 @@ mod test { fn spawn(f: F) -> Result, Whatever> { let server = Arc::::new_cyclic(|weak_this| Self { f, - base: ServerBase::new(weak_this.clone()), + base: ServerBase::new(Path::test(), weak_this.clone()), thread_exited: AtomicBool::new(false), }); Self::orpc_start_threads(&server)?; @@ -418,8 +444,8 @@ mod test { } assert_ne!( - ServerBase::new(Weak::::new()).id(), - ServerBase::new(Weak::::new()).id() + ServerBase::new(Path::test(), Weak::::new()).id(), + ServerBase::new(Path::test(), Weak::::new()).id() ); } } diff --git a/ostd/src/orpc/framework/monitor.rs b/ostd/src/orpc/framework/monitor.rs index d3f96e964..46f5e4514 100644 --- a/ostd/src/orpc/framework/monitor.rs +++ b/ostd/src/orpc/framework/monitor.rs @@ -31,7 +31,7 @@ mod tests { use orpc_macros::{orpc_monitor, orpc_server}; use crate::{ - assert_eq_eventually, + assert_eq_eventually, new_server, orpc::{ errors::RPCError, oqueue::{ @@ -77,8 +77,7 @@ mod tests { } fn spawn_server() -> Arc { - let server = TestServer::new_with(|orpc_internal, _| TestServer { - orpc_internal, + let server = new_server!(Path::test(), |_| TestServer { monitor: TestStateMonitor::new(Path::test()), }); server.monitor.start(server.clone(), TestState { x: 0 }); diff --git a/ostd/src/orpc/framework/notifier.rs b/ostd/src/orpc/framework/notifier.rs index a940a93ae..f162ba1aa 100644 --- a/ostd/src/orpc/framework/notifier.rs +++ b/ostd/src/orpc/framework/notifier.rs @@ -3,10 +3,7 @@ //! Arbitrary notifications via ORPC use orpc_macros::orpc_trait; -use crate::orpc::{ - errors::RPCError, - legacy_oqueue::{OQueueRef, ringbuffer::MPMCOQueue}, -}; +use crate::orpc::{errors::RPCError, oqueue::OQueueRef}; /// ORPC trait for a simple notifier #[orpc_trait] @@ -16,6 +13,6 @@ pub trait Notifier { /// OQueue for listening to notifications. fn notification_oqueue(&self) -> OQueueRef<()> { - MPMCOQueue::<()>::new(1, 1) + OQueueRef::new(1, oqueue_path) } } diff --git a/ostd/src/orpc/framework/shutdown.rs b/ostd/src/orpc/framework/shutdown.rs index e655ec0f8..8f225eee5 100644 --- a/ostd/src/orpc/framework/shutdown.rs +++ b/ostd/src/orpc/framework/shutdown.rs @@ -7,10 +7,14 @@ use core::sync::atomic::AtomicBool; use log::error; use orpc_macros::orpc_trait; -use crate::orpc::{ - errors::{RPCError, ServerMissingSnafu}, - framework::CurrentServer, - legacy_oqueue::{OQueueRef, locking::ObservableLockingQueue}, +use crate::{ + orpc::{ + errors::{RPCError, ServerMissingSnafu}, + framework::CurrentServer, + oqueue::{OQueue, OQueueRef}, + path::Path, + }, + path, }; /// Trait that allows a server to be shut down gracefully. @@ -42,7 +46,7 @@ pub trait Shutdown { /// /// fn main() { /// let server = Arc::new(Server { -/// state: ShutdownState::new(), +/// state: ShutdownState::new(path), /// }); /// /// // Simulate shutting down the server @@ -58,11 +62,12 @@ pub struct ShutdownState { pub shutdown_oqueue: OQueueRef<()>, } -impl Default for ShutdownState { - fn default() -> Self { +impl ShutdownState { + /// A new shutdown handler with it's OQueue under a given path. + pub fn new(path: Path) -> Self { Self { is_shutdown: Default::default(), - shutdown_oqueue: ObservableLockingQueue::new(2, 4), + shutdown_oqueue: OQueueRef::new(2, path.append(&path!(shutdown))), } } } @@ -72,8 +77,14 @@ impl ShutdownState { pub fn shutdown(&self) { self.is_shutdown .store(true, core::sync::atomic::Ordering::Release); - if let Err(e) = self.shutdown_oqueue.produce(()) { - error!("Failed to send shutdown notification: {e}"); + + match self.shutdown_oqueue.attach_ref_producer() { + Ok(p) => { + p.produce_ref(&()); + } + Err(e) => { + error!("Failed to send shutdown notification: {e}"); + } } } diff --git a/ostd/src/orpc/legacy_oqueue/generic_test.rs b/ostd/src/orpc/legacy_oqueue/generic_test.rs index 2dc8941be..f4169e5ce 100644 --- a/ostd/src/orpc/legacy_oqueue/generic_test.rs +++ b/ostd/src/orpc/legacy_oqueue/generic_test.rs @@ -9,7 +9,7 @@ use core::{ time::Duration, }; -use super::{super::sync::select, *}; +use super::{super::sync::select_legacy, *}; use crate::{ arch::timer::TIMER_FREQ, orpc::legacy_oqueue::locking::LockingQueue, @@ -263,7 +263,7 @@ pub(crate) fn test_send_multi_receive_blocker>( let mut consumer2_counter = 0; while consumer1_counter < n_messages || consumer2_counter < n_messages { - select!( + select_legacy!( if let TestMessage { x } = consumer1.try_consume() { assert_eq!(x, consumer1_counter); consumer1_counter += 1; diff --git a/ostd/src/orpc/oqueue/generic_test.rs b/ostd/src/orpc/oqueue/generic_test.rs index 839e16d48..9d076c562 100644 --- a/ostd/src/orpc/oqueue/generic_test.rs +++ b/ostd/src/orpc/oqueue/generic_test.rs @@ -9,6 +9,8 @@ use core::{ time::Duration, }; +use orpc_macros::select; + use super::*; use crate::{ prelude::Vec, @@ -238,6 +240,76 @@ pub(crate) fn test_send_receive_blocker( assert_eq!(received_messages.len(), n_messages); } +/// Check that multithreading works at a basic level. +pub(crate) fn test_send_multi_receive_blocker( + oqueue1: ConsumableOQueueRef, + oqueue2: ConsumableOQueueRef, + n_messages: usize, +) { + // Consumer which receives all the messages + let consumer1 = oqueue1.attach_consumer().unwrap(); + let consumer2 = oqueue2.attach_consumer().unwrap(); + let recv_queue = Arc::new(WaitQueue::new()); + let recv_completed = Arc::new(AtomicBool::new(false)); + let receive_thread = TaskOptions::new({ + let recv_queue = recv_queue.clone(); + let recv_completed = recv_completed.clone(); + move || { + let mut consumer1_counter = 0; + let mut consumer2_counter = 0; + + while consumer1_counter < n_messages || consumer2_counter < n_messages { + select!( + if let TestMessage { x } = consumer1.try_consume() { + assert_eq!(x, consumer1_counter); + consumer1_counter += 1; + }, + if let TestMessage { x } = consumer2.try_consume() { + assert_eq!(x, consumer2_counter); + consumer2_counter += 1; + } + ) + } + recv_completed.store(true, Ordering::Relaxed); + recv_queue.wake_all(); + } + }) + .spawn() + .unwrap(); + + let producer_queue = Arc::new(WaitQueue::new()); + // Producer thread which sends n messages + let producer_thread_completions: Vec<_> = [oqueue1, oqueue2] + .into_iter() + .enumerate() + .map(|(i, oqueue)| { + let producer = oqueue.attach_value_producer().unwrap(); + let completed = Arc::new(AtomicBool::new(false)); + TaskOptions::new({ + let completed = completed.clone(); + let producer_queue = producer_queue.clone(); + move || { + for x in 0..n_messages { + producer.produce(TestMessage { x }); + sleep(Duration::from_millis(i as u64 + 1)); + } + completed.store(true, Ordering::Relaxed); + producer_queue.wake_all(); + } + }) + .spawn() + .unwrap(); + completed + }) + .collect(); + + // Wait for all threads to finish + for completed in producer_thread_completions { + producer_queue.wait_until(|| completed.load(Ordering::Relaxed).then_some(())); + } + recv_queue.wait_until(|| recv_completed.load(Ordering::Relaxed).then_some(())); +} + /// Test produce operations with strong observation, but without any consumer attached. Especially, /// if the queue blocks when there is no consumer. pub(crate) fn test_produce_strong_observe_only(queue: ConsumableOQueueRef) { diff --git a/ostd/src/orpc/oqueue/mod.rs b/ostd/src/orpc/oqueue/mod.rs index c8cc4a5e0..0b298f3a4 100644 --- a/ostd/src/orpc/oqueue/mod.rs +++ b/ostd/src/orpc/oqueue/mod.rs @@ -1117,6 +1117,13 @@ mod test { generic_test::test_send_receive_blocker(queue, 32, 3); } + #[ktest] + fn send_multi_receive_blocker_observable() { + let oqueue1 = ConsumableOQueueRef::::new(16, Path::test()); + let oqueue2 = ConsumableOQueueRef::::new(16, Path::test()); + generic_test::test_send_multi_receive_blocker(oqueue1, oqueue2, 50); + } + #[ktest] fn generic_produce_strong_observe_only() { generic_test::test_produce_strong_observe_only(ConsumableOQueueRef::< diff --git a/ostd/src/orpc/oqueue/query.rs b/ostd/src/orpc/oqueue/query.rs index 3d6e3e463..aac851eca 100644 --- a/ostd/src/orpc/oqueue/query.rs +++ b/ostd/src/orpc/oqueue/query.rs @@ -60,6 +60,14 @@ impl ObservationQuery { } } +impl ObservationQuery { + /// A query which only observes that a message was sent without capturing any information. + pub fn unit() -> Self { + Self { + extractor: Box::new(|_| Some(())), + } + } +} #[cfg(ktest)] mod test { use super::*; diff --git a/ostd/src/orpc/path.rs b/ostd/src/orpc/path.rs index 28367c7fd..4638b79d1 100644 --- a/ostd/src/orpc/path.rs +++ b/ostd/src/orpc/path.rs @@ -276,6 +276,8 @@ macro_rules! path { #[allow(clippy::allow_attributes, unused)] use ::core::sync::atomic::{AtomicUsize, Ordering}; use ::alloc::vec; + #[allow(clippy::allow_attributes, unused)] + use ::alloc::string::ToString; let components = $crate::__path_parse!([] @ {$($part)*}); Path::new(components) diff --git a/ostd/src/orpc/statistics.rs b/ostd/src/orpc/statistics.rs index a546c32ea..b2c4f2c43 100644 --- a/ostd/src/orpc/statistics.rs +++ b/ostd/src/orpc/statistics.rs @@ -14,17 +14,24 @@ use super::{ shutdown::{self, ShutdownState}, spawn_thread, }, - legacy_oqueue::{OQueueRef, locking::ObservableLockingQueue}, sync::select, }; -use crate::orpc::legacy_oqueue::OQueueAttachError; +use crate::{ + new_server, + orpc::{ + oqueue::{ + OQueue as _, OQueueBase as _, OQueueError, OQueueRef, ObservationQuery, StrongObserver, + }, + path::Path, + }, +}; /// An ORPC trait exposing an OQueue of outstanding request counts. #[orpc_trait] pub trait Outstanding { /// The OQueue that publishes the number of outstanding requests (requests - replies). fn outstanding_oqueue(&self) -> OQueueRef { - ObservableLockingQueue::new(4, 8) + OQueueRef::new(8, oqueue_path) } } @@ -45,23 +52,21 @@ impl shutdown::Shutdown for OutstandingCounter { impl OutstandingCounter { /// Spawn a new `OutstandingCounter` server which observes `request_oqueue` and /// `reply_oqueue`. - pub fn spawn( - request_oqueue: OQueueRef, - reply_oqueue: OQueueRef, - ) -> Result, OQueueAttachError> { - let server = Self::new_with(|orpc_internal, _| Self { - orpc_internal, - shutdown_state: Default::default(), + pub fn spawn( + path: Path, + request_observer: StrongObserver<()>, + reply_observer: StrongObserver<()>, + ) -> Result, OQueueError> { + let server = new_server!(path.clone(), |_| Self { + shutdown_state: ShutdownState::new(path), }); spawn_thread(server.clone(), { - let request_observer = request_oqueue.attach_strong_observer()?; - let reply_observer = reply_oqueue.attach_strong_observer()?; let shutdown_observer = server .shutdown_state .shutdown_oqueue - .attach_strong_observer()?; - let outstanding_oqueue_producer = server.outstanding_oqueue().attach_producer()?; + .attach_strong_observer(ObservationQuery::identity())?; + let outstanding_oqueue_producer = server.outstanding_oqueue().attach_ref_producer()?; let server = server.clone(); move || { @@ -71,11 +76,11 @@ impl OutstandingCounter { select!( if let _ = request_observer.try_strong_observe() { outstanding += 1; - outstanding_oqueue_producer.produce(outstanding); + outstanding_oqueue_producer.produce_ref(&outstanding); }, if let _ = reply_observer.try_strong_observe() { outstanding -= 1; - outstanding_oqueue_producer.produce(outstanding); + outstanding_oqueue_producer.produce_ref(&outstanding); }, if let () = shutdown_observer.try_strong_observe() {} ); diff --git a/ostd/src/orpc/sync/mod.rs b/ostd/src/orpc/sync/mod.rs index f55d1f331..ecf444c27 100644 --- a/ostd/src/orpc/sync/mod.rs +++ b/ostd/src/orpc/sync/mod.rs @@ -1,11 +1,11 @@ // SPDX-License-Identifier: MPL-2.0 //! A trait [`Blocker`] which allows a thread to wait for a wake-up from another thread. The API is designed to allow a -//! waiter to wait on multiple blockers at the same time to support [`select!`]. +//! waiter to wait on multiple blockers at the same time to support [`select_legacy!`]. use alloc::vec::Vec; -pub use orpc_macros::select; +pub use orpc_macros::{select, select_legacy}; use crate::{ orpc::framework::CurrentServer, diff --git a/ostd/src/orpc_common/new_server.rs b/ostd/src/orpc_common/new_server.rs index 79bf528ff..29734e8fd 100644 --- a/ostd/src/orpc_common/new_server.rs +++ b/ostd/src/orpc_common/new_server.rs @@ -28,11 +28,15 @@ macro_rules! new_server { #[macro_export] #[cfg(not(baseline_asterinas))] macro_rules! __new_server { - // Pattern for structs with orpc_internal field ( - |$weak_self:tt| $struct_name:ident { $($field:ident $(:$value:expr)?),* $(,)? } + $path:expr, |$weak_self:tt| { $($body:tt)* } ) => { - $struct_name::new_with(|orpc_internal, $weak_self| $struct_name { + $crate::__new_server!($path, |$weak_self| $($body)*) + }; + ( + $path:expr, |$weak_self:tt| $struct_name:ident { $($field:ident $(:$value:expr)?),* $(,)? } + ) => { + $struct_name::new_with($path, |orpc_internal, $weak_self| $struct_name { orpc_internal, $($field $(:$value)?),* }) @@ -44,9 +48,13 @@ macro_rules! __new_server { #[macro_export] #[cfg(baseline_asterinas)] macro_rules! __new_server { - // Pattern for structs with orpc_internal field ( - |$weak_self:tt| $struct_name:ident { $($field:ident $(:$value:expr)?),* $(,)? } + $path:expr, |$weak_self:tt| { $($body:tt)* } + ) => { + $crate::__new_server!($path, |$weak_self| $($body)*) + }; + ( + $path:expr, |$weak_self:tt| $struct_name:ident { $($field:ident $(:$value:expr)?),* $(,)? } ) => { Arc::new_cyclic(|$weak_self| $struct_name { $($field $(:$value)?),* diff --git a/ostd/tests/early-boot-test-kernel/src/orpc_tests.rs b/ostd/tests/early-boot-test-kernel/src/orpc_tests.rs index 0a780ab65..411e50df4 100644 --- a/ostd/tests/early-boot-test-kernel/src/orpc_tests.rs +++ b/ostd/tests/early-boot-test-kernel/src/orpc_tests.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MPL-2.0 -use ostd::orpc::{errors::RPCError, orpc_impl, orpc_server, orpc_trait}; +use ostd::orpc::{errors::RPCError, orpc_impl, orpc_server, orpc_trait, path::Path}; /// The methods used for testing the ORPC framework in early-boot context. #[orpc_trait] @@ -20,6 +20,8 @@ impl TestTrait for TestServer { } pub(crate) fn test_early_boot_server() { - let server = TestServer::new_with(|orpc_internal, _| TestServer { orpc_internal }); + let server = TestServer::new_with(Path::test(), |orpc_internal, _| TestServer { + orpc_internal, + }); assert_eq!(server.f().unwrap(), 42); }