diff --git a/kernel/src/fs/server_traits.rs b/kernel/src/fs/server_traits.rs index 118c5e33f..a6018623f 100644 --- a/kernel/src/fs/server_traits.rs +++ b/kernel/src/fs/server_traits.rs @@ -7,8 +7,9 @@ use ostd::orpc::{ legacy_oqueue::{OQueue as _, OQueueRef, Producer, reply::ReplyQueue}, orpc_trait, }; +use serde::Serialize; -use crate::{Result, fs::utils::CachePage}; +use crate::{Result, event::EventContext, fs::utils::CachePage}; /// A reference to a page in a [`PageStore`]. It contains the page index and the frame that holds /// the page data (if available). @@ -137,7 +138,8 @@ pub trait PageStore: PageIOObservable { } /// The state of a page in the cache. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[repr(u8)] pub enum CacheState { /// The page was in the cache. Hit, @@ -145,15 +147,42 @@ pub enum CacheState { Miss, /// The page was currently being read into the cache. Pending, + Prefetch, + /// The page was evicted from the cache (LRU eviction). + Evict, } -/// Information about a read request on the page cache. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize)] pub struct PageCacheReadInfo { - /// The index of the page. - pub idx: usize, + pub idx: u64, /// The state of the cached page when the request was made. pub cache_state: CacheState, + pub cache_id: usize, + /// Total number of pages in the underlying store. + pub store_size: u64, + /// Number of pages currently held in the cache. + pub cache_pages: u64, + pub context: EventContext, +} + +impl PageCacheReadInfo { + /// Creates a new PageCacheReadInfo with computed timestamp and tid. + pub fn new( + idx: u64, + cache_state: CacheState, + cache_id: usize, + store_size: u64, + cache_pages: u64, + ) -> Self { + PageCacheReadInfo { + idx, + cache_state, + cache_id, + store_size, + cache_pages, + context: EventContext::new(), + } + } } #[orpc_trait] diff --git a/kernel/src/fs/utils/mod.rs b/kernel/src/fs/utils/mod.rs index e193306bd..c61ed7c51 100644 --- a/kernel/src/fs/utils/mod.rs +++ b/kernel/src/fs/utils/mod.rs @@ -41,8 +41,6 @@ mod page_cache; #[path = "page_cache_baseline.rs"] pub(super) mod page_cache; #[cfg(not(baseline_asterinas))] -mod page_cache_logger; -#[cfg(not(baseline_asterinas))] mod page_prefetch; mod random_test; mod range_lock; diff --git a/kernel/src/fs/utils/page_cache.rs b/kernel/src/fs/utils/page_cache.rs index 2e5ea2217..d2fce4a04 100644 --- a/kernel/src/fs/utils/page_cache.rs +++ b/kernel/src/fs/utils/page_cache.rs @@ -14,6 +14,7 @@ use core::{ use align_ext::AlignExt; use aster_rights::Full; use lru::LruCache; +use mariposa_data_capture::legacy::{DataCaptureFile, FileDescriptor}; use ostd::{ impl_untyped_frame_meta_for, mm::{Frame, FrameAllocOptions, UFrame, VmIo}, @@ -22,6 +23,7 @@ use ostd::{ legacy_oqueue::{Consumer, OQueueRef, Producer, reply::ReplyQueue}, orpc_impl, orpc_server, }, + path, task::Task, }; use snafu::OptionExt; @@ -33,12 +35,9 @@ use crate::{ self, AsyncReadRequest, AsyncWriteRequest, CacheState, PageCache as _, PageCacheReadInfo, PageHandle, PageIOObservable, PageStore, }, - utils::{ - page_cache_logger::PageCacheLogger, - page_prefetch::{ReadaheadPrefetcher, StridedPrefetcher}, - }, + utils::page_prefetch::{ReadaheadPrefetcher, StridedPrefetcher}, }, - kcmdline, + kcmdline, new_legacy_data_capture_file, prelude::*, vm::vmo::{Pager, Vmo, VmoFlags, VmoOptions, get_page_idx_range}, }; @@ -88,10 +87,10 @@ fn get_prefetch_policy() -> PrefetchPolicy { } /// Retrieves whether cache hits and misses should be logged based on the kernel command-line argument -/// "page_cache.log_hits_misses". The options are: `true` or `false`. -fn get_log_hits_misses() -> bool { +/// "page_cache.capture_accesses". The options are: `true` or `false`. +fn get_capture_accesses() -> bool { kcmdline::get_kernel_cmd_line() - .and_then(|cl| cl.get_module_arg_by_name("page_cache", "log_hits_misses")) + .and_then(|cl| cl.get_module_arg_by_name("page_cache", "capture_accesses")) .unwrap_or(false) } @@ -136,6 +135,12 @@ impl PageCache { Ok(()) } + /// Returns an identifier for this cache instance, matching the `cache_id` field in + /// `PageCacheReadInfo` events. + pub fn cache_id(&self) -> usize { + Arc::as_ptr(&self.manager) as usize + } + /// Returns the Vmo object. // TODO: The capability is too high, restrict it to eliminate the possibility of misuse. // For example, the `resize` api should be forbidden. @@ -510,7 +515,6 @@ impl PageCacheManagerInner { } impl PageCacheManager { - #[track_caller] pub fn spawn(backend: Weak, policy: PrefetchPolicy) -> Result> { let policy = if Task::current().is_none() { PrefetchPolicy::None @@ -550,8 +554,25 @@ impl PageCacheManager { let mut inner = server.inner.lock(); let inner = inner.deref_mut(); + let cache_pages = inner.pages.len() as u64; + // If the page is not in the cache, issue a request. if inner.pages.get(&idx).is_none() { + if inner.page_cache_read_info_producer.is_none() { + inner.page_cache_read_info_producer = + Some(server.page_cache_read_info_oqueue().attach_producer()?); + } + inner + .page_cache_read_info_producer + .as_ref() + .unwrap() + .produce(PageCacheReadInfo::new( + idx as u64, + CacheState::Prefetch, + server.as_ref() as *const _ as usize, + server.backend()?.npages()? as u64, + cache_pages, + )); inner.outstanding_requests.request_async( &mut inner.pages, &server.backend()?, @@ -564,8 +585,27 @@ impl PageCacheManager { }); // TODO(arthurp, #120): This is never shutdown even if the cache is. - if get_log_hits_misses() { - PageCacheLogger::spawn(server.page_cache_read_info_oqueue())?; + if get_capture_accesses() { + static PAGE_CACHE_LOG_FILE: Mutex>>> = + Mutex::new(None); + + let file = { + let mut file_guard = PAGE_CACHE_LOG_FILE.lock(); + if file_guard.is_none() { + *file_guard = Some(new_legacy_data_capture_file(FileDescriptor { + length: 200 * 1024 * 1024, + path: path!(page_cache.read_info), + })); + } + file_guard.as_ref().unwrap().clone() + }; + + file.register_observer(mariposa_data_capture::legacy::ObserverRegistration { + observer: server + .page_cache_read_info_oqueue() + .attach_strong_observer()?, + })?; + file.start()?; } if policy != PrefetchPolicy::Builtin && policy != PrefetchPolicy::None { @@ -596,29 +636,16 @@ impl PageCacheManager { let page_idx_range = get_page_idx_range(&range); let mut consumers = Vec::new(); - // TODO(arthurp): This locks the entire cache. That's probably a performance problem. - { - let mut inner = self.inner.lock(); - let pages = &mut inner.pages; - let backend = self.backend()?; - let backend_npages = backend.npages()?; - for idx in page_idx_range.start..page_idx_range.end { - if let Some(page) = pages.peek(&idx) { - if page.load_state() == PageState::Dirty && idx < backend_npages { - let (reply_handle, reply_consumer) = ReplyQueue::new_pair()?; - backend.write_page_async(AsyncWriteRequest { - handle: PageHandle { - idx, - frame: page.clone(), - }, - reply_handle: Some(reply_handle), - })?; - consumers.push(reply_consumer); - } - } + let mut inner = self.inner.lock(); + let pages = &mut inner.pages; + let backend = self.backend()?; + for idx in page_idx_range.start..page_idx_range.end { + if let Some(reply_consumer) = flush_page(pages, &backend, idx)? { + consumers.push(reply_consumer); } } + // TODO(arthurp): This waits for the flush with the lock held. for consumer in consumers { let PageHandle { idx: _, frame } = consumer.consume(); frame.store_state(PageState::UpToDate); @@ -635,8 +662,8 @@ impl PageCacheManager { let frame = { let backend = self.backend()?; - let mut inner = self.inner.lock(); - let inner = inner.deref_mut(); + let mut inner_guard = self.inner.lock(); + let inner = inner_guard.deref_mut(); // Lazily initialize page_cache_read_info_producer if inner.page_cache_read_info_producer.is_none() { @@ -654,15 +681,20 @@ impl PageCacheManager { // 1. The requested page is ready for read in page cache. // 2. The requested page is currently being read (generally due to a prefetch). // 3. The requested page is on disk, need a sync read operation here. + let store_size = backend.npages()? as u64; + let cache_pages = inner.pages.len() as u64; let frame = if let Some(page) = inner.pages.get(&idx) { // Cond 1 & 2. if let PageState::Uninit = page.load_state() { // Cond 2: We should wait for the previous readahead. // If there is no previous readahead, an error must have occurred somewhere. - page_cache_read_info_producer.produce(PageCacheReadInfo { - idx, - cache_state: CacheState::Pending, - }); + page_cache_read_info_producer.produce(PageCacheReadInfo::new( + idx as u64, + CacheState::Pending, + self as *const _ as usize, + store_size, + cache_pages, + )); assert!(inner.outstanding_requests.has_requests()); inner .outstanding_requests @@ -670,18 +702,24 @@ impl PageCacheManager { inner.pages.get(&idx).context(UNREACHABLE_SNAFU)?.clone() } else { // Cond 1. - page_cache_read_info_producer.produce(PageCacheReadInfo { - idx, - cache_state: CacheState::Hit, - }); + page_cache_read_info_producer.produce(PageCacheReadInfo::new( + idx as u64, + CacheState::Hit, + self as *const _ as usize, + store_size, + cache_pages, + )); page.clone() } } else { // Cond 3. - page_cache_read_info_producer.produce(PageCacheReadInfo { - idx, - cache_state: CacheState::Miss, - }); + page_cache_read_info_producer.produce(PageCacheReadInfo::new( + idx as u64, + CacheState::Miss, + self as *const _ as usize, + store_size, + cache_pages, + )); // Conducts the sync read operation. let page = if idx < backend.npages()? { let page = CachePage::alloc_uninit()?; @@ -711,6 +749,33 @@ impl PageCacheManager { } } +/// Start the flush of a page to storage. +/// +/// This places the reply consumer into `consumers`. The caller should wait on it appropriately. +fn flush_page( + pages: &mut LruCache>, + backend: &Arc, + idx: usize, +) -> Result>>> { + let backend_npages = backend.npages()?; + if let Some(page) = pages.peek(&idx) + && page.load_state() == PageState::Dirty + { + assert!(idx < backend_npages); + let (reply_handle, reply_consumer) = ReplyQueue::new_pair()?; + backend.write_page_async(AsyncWriteRequest { + handle: PageHandle { + idx, + frame: page.clone(), + }, + reply_handle: Some(reply_handle), + })?; + Ok(Some(reply_consumer)) + } else { + Ok(None) + } +} + impl Debug for PageCacheManager { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { f.debug_struct("PageCacheManager") diff --git a/kernel/src/fs/utils/page_cache_logger.rs b/kernel/src/fs/utils/page_cache_logger.rs deleted file mode 100644 index bc88cea2d..000000000 --- a/kernel/src/fs/utils/page_cache_logger.rs +++ /dev/null @@ -1,62 +0,0 @@ -// SPDX-License-Identifier: MPL-2.0 - -use alloc::sync::Arc; - -use aster_logger::println; -use ostd::orpc::{ - errors::RPCError, - framework::{shutdown, spawn_thread}, - legacy_oqueue::{OQueueAttachError, OQueueRef}, - orpc_impl, orpc_server, - sync::select_legacy, -}; - -use crate::fs::server_traits::PageCacheReadInfo; - -/// A server to monitor and log the cache activity of a [`crate::fs::server_traits::PageCache`]. -#[orpc_server(shutdown::Shutdown)] -pub struct PageCacheLogger { - shutdown_state: shutdown::ShutdownState, -} - -#[orpc_impl] -impl shutdown::Shutdown for PageCacheLogger { - fn shutdown(&self) -> Result<(), RPCError> { - self.shutdown_state.shutdown(); - Ok(()) - } -} - -impl PageCacheLogger { - pub fn spawn( - page_cache_read_info_oqueue: OQueueRef, - ) -> Result, OQueueAttachError> { - let server = Self::new_with(|orpc_internal, _| Self { - orpc_internal, - shutdown_state: Default::default(), - }); - - spawn_thread(server.clone(), { - let read_obs = page_cache_read_info_oqueue.attach_strong_observer()?; - let shutdown_obs = server - .shutdown_state - .shutdown_oqueue - .attach_strong_observer()?; - let server = server.clone(); - - move || { - loop { - server.shutdown_state.check()?; - select_legacy!( - if let info = read_obs.try_strong_observe() { - println!("{:?}", info); - }, - if let () = shutdown_obs.try_strong_observe() {} - ); - } - } - }); - - Ok(server) - } -}