Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions kernel/comps/block/src/bio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aster_time::read_monotonic_time;
use bitvec::array::BitArray;
use int_to_c_enum::TryFromInt;
#[cfg(not(baseline_asterinas))]
use ostd::orpc::legacy_oqueue::{OQueueAttachError, Producer};
use ostd::orpc::oqueue::{OQueueError, ValueProducer};
use ostd::{
Error,
mm::{
Expand All @@ -25,7 +25,7 @@ use crate::{BLOCK_SIZE, SECTOR_SIZE, prelude::*, request_queue::BioRequestSingle
/// Trace data for block device I/O completion.
///
/// This struct captures performance metrics when a block I/O request completes.
#[derive(Clone)]
#[derive(Clone, Copy, Default)]
pub struct BlockDeviceCompletionStats {
/// The latency of the I/O request (time from submission to completion).
pub latency: Duration,
Expand Down Expand Up @@ -202,13 +202,13 @@ pub enum BioEnqueueError {
TooBig,
/// OQueue attachment failures
#[cfg(not(baseline_asterinas))]
OQueueAttachError(OQueueAttachError),
OQueueError(OQueueError),
}

#[cfg(not(baseline_asterinas))]
impl From<OQueueAttachError> for BioEnqueueError {
fn from(err: OQueueAttachError) -> Self {
Self::OQueueAttachError(err)
impl From<OQueueError> for BioEnqueueError {
fn from(err: OQueueError) -> Self {
Self::OQueueError(err)
}
}

Expand Down Expand Up @@ -325,7 +325,7 @@ pub struct SubmittedBio {
bio_inner: Arc<BioInner>,

#[cfg(not(baseline_asterinas))]
reply_handle: Option<Box<dyn Producer<BlockDeviceCompletionStats>>>,
reply_handle: Option<ValueProducer<BlockDeviceCompletionStats>>,

submission_time: Option<Duration>,

Expand Down Expand Up @@ -406,7 +406,7 @@ impl SubmittedBio {
#[cfg(not(baseline_asterinas))]
pub fn prepare_enqueue(
&mut self,
reply_handle: Box<dyn Producer<BlockDeviceCompletionStats>>,
reply_handle: ValueProducer<BlockDeviceCompletionStats>,
bio_request_single_queue: Arc<BioRequestSingleQueue>,
) {
self.reply_handle = Some(reply_handle);
Expand Down
4 changes: 3 additions & 1 deletion kernel/comps/block/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub mod request_queue;
pub mod test_utils;

use component::{ComponentInitError, init_component};
use ostd::sync::SpinLock;
use ostd::{orpc::path::Path, sync::SpinLock};
use spin::Once;

use self::{
Expand All @@ -58,6 +58,8 @@ pub trait BlockDevice: Send + Sync + Any + Debug {

/// Returns the metadata of the block device.
fn metadata(&self) -> BlockDeviceMeta;

fn path(&self) -> Path;
}

/// Metadata for a block device.
Expand Down
10 changes: 9 additions & 1 deletion kernel/comps/block/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use alloc::{boxed::Box, vec};

use ostd::{mm::UntypedMem, sync::Mutex};
use ostd::{mm::UntypedMem, orpc::path::Path, path, sync::Mutex};

use crate::{
BlockDevice, BlockDeviceMeta, SECTOR_SIZE,
Expand All @@ -24,6 +24,10 @@ impl BlockDevice for FakeBlockDevice {
fn metadata(&self) -> crate::BlockDeviceMeta {
todo!()
}

fn path(&self) -> Path {
Path::test()
}
}

/// A block device backed by memory.
Expand Down Expand Up @@ -77,4 +81,8 @@ impl BlockDevice for MemoryDisk {
nr_sectors: self.data.lock().len() / SECTOR_SIZE,
}
}

fn path(&self) -> Path {
path!(memory_disk)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub struct DataCaptureDeviceServer {
}

impl DataCaptureDeviceServer {
pub fn new(block_device: Arc<dyn BlockDevice>) -> Arc<DataCaptureDeviceServer> {
new_server!(|_| DataCaptureDeviceServer {
pub fn new(path: Path, block_device: Arc<dyn BlockDevice>) -> Arc<DataCaptureDeviceServer> {
new_server!(path, |_| DataCaptureDeviceServer {
block_device,
next_block_offset: AtomicUsize::new(0),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl DataCaptureFileBuilder {
.call_in_context(move || -> Result<Arc<DataCaptureFileServer<T>>, RPCError> {
let command_oqueue =
ConsumableOQueueRef::new(8, self.path.append(&path!(commands)));
let server = new_server!(|_| DataCaptureFileServer {
let server = new_server!(self.path, |_| DataCaptureFileServer {
command_producer: command_oqueue
.attach_value_producer()
.expect("single purpose OQueue failed."),
Expand Down
7 changes: 5 additions & 2 deletions kernel/comps/mariposa_data_capture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ mod tests {
use aster_block::test_utils::MemoryDisk;
use ostd::{
assertion::sleep,
orpc::oqueue::{OQueue, OQueueBase, OQueueRef, ObservationQuery},
orpc::{
oqueue::{OQueue, OQueueBase, OQueueRef, ObservationQuery},
path::Path,
},
path,
prelude::*,
};
Expand All @@ -71,7 +74,7 @@ mod tests {
fn test_capture_server() {
// Create memory disk with space for 4 blocks
let block_device = Arc::new(MemoryDisk::new(4096 * 4));
let device = DataCaptureDeviceServer::new(block_device.clone());
let device = DataCaptureDeviceServer::new(Path::test(), block_device.clone());

let path = path!(test_capture);
let builder = device
Expand Down
13 changes: 12 additions & 1 deletion kernel/comps/mlsdisk/src/layers/5-disk/mlsdisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use core::{
sync::atomic::{AtomicBool, Ordering},
};

use ostd::{ignore_err, mm::VmIo};
use ostd::{ignore_err, mm::VmIo, orpc::path::Path, path};
use ostd_pod::Pod;

use super::{
Expand Down Expand Up @@ -65,6 +65,8 @@ struct DiskInner<D: BlockSet> {
root_key: Key,
/// Whether `MlsDisk` is dropped.
is_dropped: AtomicBool,
/// The path of this device.
path: Path,
/// Scope lock for control write and sync operation.
write_sync_region: RwLock<()>,
}
Expand Down Expand Up @@ -163,6 +165,10 @@ impl<D: BlockSet + 'static> aster_block::BlockDevice for MlsDisk<D> {
nr_sectors: (BLOCK_SIZE / SECTOR_SIZE) * self.total_blocks(),
}
}

fn path(&self) -> ostd::orpc::path::Path {
self.inner.path.clone()
}
}

impl<D: BlockSet + 'static> MlsDisk<D> {
Expand Down Expand Up @@ -211,6 +217,9 @@ impl<D: BlockSet + 'static> MlsDisk<D> {
self.inner.user_data_disk.nblocks()
}

// TODO(arthurp): The paths of the disks are not based on the underlying block devices like they
// should be.

/// Creates a new `MlsDisk` on the given disk, with the root encryption key.
pub fn create(
disk: D,
Expand Down Expand Up @@ -254,6 +263,7 @@ impl<D: BlockSet + 'static> MlsDisk<D> {
root_key,
is_dropped: AtomicBool::new(false),
write_sync_region: RwLock::new(()),
path: path!(block_device.create_mls[unique]),
}),
};

Expand Down Expand Up @@ -306,6 +316,7 @@ impl<D: BlockSet + 'static> MlsDisk<D> {
root_key,
is_dropped: AtomicBool::new(false),
write_sync_region: RwLock::new(()),
path: path!(block_device.open_mls[unique]),
}),
};

Expand Down
5 changes: 5 additions & 0 deletions kernel/comps/mlsdisk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ mod test {
};
use ostd::{
mm::{FrameAllocOptions, Segment, VmIo},
path,
prelude::*,
};

Expand Down Expand Up @@ -185,6 +186,10 @@ mod test {
nr_sectors: self.blocks.size() / SECTOR_SIZE,
}
}

fn path(&self) -> ostd::orpc::path::Path {
path!(memory_disk)
}
}

fn create_rawdisk(nblocks: usize) -> RawDisk {
Expand Down
17 changes: 14 additions & 3 deletions kernel/comps/raid/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use aster_block::{
request_queue::{BioRequest, BioRequestSingleQueue},
};
#[cfg(not(baseline_asterinas))]
use ostd::orpc::orpc_server;
use ostd::{
orpc::{orpc_server, path::Path},
path,
};

#[cfg(not(baseline_asterinas))]
use crate::server_traits::SelectionPolicy;
Expand Down Expand Up @@ -63,6 +66,8 @@ pub struct Raid1Device {
/// Basic capacity limits for the logical device (min across members).
metadata: BlockDeviceMeta,

path: Path,

/// The policy to select the read member.
selection_policy: Arc<dyn SelectionPolicy>,
}
Expand Down Expand Up @@ -112,6 +117,8 @@ impl Raid1Device {
members: Vec<Arc<dyn BlockDevice>>,
selection_policy: Arc<dyn SelectionPolicy>,
) -> Result<(), Raid1DeviceError> {
use ostd::new_server;

if members.len() < 2 {
return Err(Raid1DeviceError::NotEnoughMembers);
}
Expand All @@ -122,12 +129,12 @@ impl Raid1Device {
let queue =
BioRequestSingleQueue::with_max_nr_segments_per_bio(metadata.max_nr_segments_per_bio);

let device = Self::new_with(|orpc_internal, _weak_self| Raid1Device {
orpc_internal,
let device = new_server!(path!(block.raid1.{name}), |_| Raid1Device {
members,
queue,
metadata,
selection_policy,
path: path!(block_device.raid1.{name})
});

aster_block::register_device(name.to_owned(), device.clone());
Expand Down Expand Up @@ -365,4 +372,8 @@ impl BlockDevice for Raid1Device {
fn metadata(&self) -> BlockDeviceMeta {
self.metadata
}

fn path(&self) -> ostd::orpc::path::Path {
self.path.clone()
}
}
30 changes: 22 additions & 8 deletions kernel/comps/raid/src/selection_policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@
use alloc::{sync::Arc, vec::Vec};
use core::sync::atomic::{AtomicUsize, Ordering};

use aster_block::BlockDevice;
use ostd::{Error, orpc::orpc_server};
use aster_block::{BlockDevice, bio::BlockDeviceCompletionStats};
use ostd::{
Error,
orpc::{
oqueue::{OQueueBase as _, ObservationQuery},
orpc_server,
path::Path,
},
};

use crate::server_traits::{ObservableBlockDevice, SelectionPolicy};

Expand All @@ -18,8 +25,8 @@ pub struct RoundRobinPolicy {
}

impl RoundRobinPolicy {
pub fn new(members: Vec<Arc<dyn BlockDevice>>) -> Result<Arc<Self>, Error> {
let server = Self::new_with(|orpc_internal, _| Self {
pub fn new(path: Path, members: Vec<Arc<dyn BlockDevice>>) -> Result<Arc<Self>, Error> {
let server = Self::new_with(path, |orpc_internal, _| Self {
orpc_internal,
read_cursor: AtomicUsize::new(0),
members,
Expand Down Expand Up @@ -49,8 +56,11 @@ pub struct LinnOSPolicy {
}

impl LinnOSPolicy {
pub fn new(members: Vec<Arc<dyn ObservableBlockDevice>>) -> Result<Arc<Self>, Error> {
let server = Self::new_with(|orpc_internal, _| Self {
pub fn new(
path: Path,
members: Vec<Arc<dyn ObservableBlockDevice>>,
) -> Result<Arc<Self>, Error> {
let server = Self::new_with(path, |orpc_internal, _| Self {
orpc_internal,
read_cursor: AtomicUsize::new(0),
members,
Expand All @@ -70,15 +80,19 @@ impl SelectionPolicy for LinnOSPolicy {
.map(|device| {
device
.bio_completion_oqueue()
.attach_weak_observer()
.attach_weak_observer(4, ObservationQuery::identity())
.expect("Failed to attach weak observer to bio_completion_oqueue")
})
.collect();

loop {
let idx = self.read_cursor.fetch_add(1, Ordering::Relaxed);
let observer = &trace_observers[idx % trace_observers.len()];
let completion_trace = observer.weak_observe_recent(4);
let completion_trace: Vec<BlockDeviceCompletionStats> = observer
.weak_observe_recent(4)?
.iter()
.map(|v| v.unwrap_or_default())
.collect();

// Inference using the ML model
let x = self.model[0] * completion_trace[0].latency.as_nanos() as f32
Expand Down
Loading
Loading