Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions kernel/src/fs/server_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use ostd::orpc::{
legacy_oqueue::{OQueue as _, OQueueRef, Producer, reply::ReplyQueue},
orpc_trait,
};
use serde::Serialize;

use crate::{Result, fs::utils::CachePage};
use crate::{Result, event::EventContext, fs::utils::CachePage};

/// A reference to a page in a [`PageStore`]. It contains the page index and the frame that holds
/// the page data (if available).
Expand Down Expand Up @@ -137,23 +138,51 @@ pub trait PageStore: PageIOObservable {
}

/// The state of a page in the cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[repr(u8)]
pub enum CacheState {
/// The page was in the cache.
Hit,
/// The page was not in the cache.
Miss,
/// The page was currently being read into the cache.
Pending,
Prefetch,
/// The page was evicted from the cache (LRU eviction).
Evict,
}

/// Information about a read request on the page cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize)]
pub struct PageCacheReadInfo {
/// The index of the page.
pub idx: usize,
pub idx: u64,
/// The state of the cached page when the request was made.
pub cache_state: CacheState,
pub cache_id: usize,
/// Total number of pages in the underlying store.
pub store_size: u64,
/// Number of pages currently held in the cache.
pub cache_pages: u64,
pub context: EventContext,
}

impl PageCacheReadInfo {
/// Creates a new PageCacheReadInfo with computed timestamp and tid.
pub fn new(
idx: u64,
cache_state: CacheState,
cache_id: usize,
store_size: u64,
cache_pages: u64,
) -> Self {
PageCacheReadInfo {
idx,
cache_state,
cache_id,
store_size,
cache_pages,
context: EventContext::new(),
}
}
}

#[orpc_trait]
Expand Down
2 changes: 0 additions & 2 deletions kernel/src/fs/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ mod page_cache;
#[path = "page_cache_baseline.rs"]
pub(super) mod page_cache;
#[cfg(not(baseline_asterinas))]
mod page_cache_logger;
#[cfg(not(baseline_asterinas))]
mod page_prefetch;
mod random_test;
mod range_lock;
Expand Down
155 changes: 110 additions & 45 deletions kernel/src/fs/utils/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use core::{
use align_ext::AlignExt;
use aster_rights::Full;
use lru::LruCache;
use mariposa_data_capture::legacy::{DataCaptureFile, FileDescriptor};
use ostd::{
impl_untyped_frame_meta_for,
mm::{Frame, FrameAllocOptions, UFrame, VmIo},
Expand All @@ -22,6 +23,7 @@ use ostd::{
legacy_oqueue::{Consumer, OQueueRef, Producer, reply::ReplyQueue},
orpc_impl, orpc_server,
},
path,
task::Task,
};
use snafu::OptionExt;
Expand All @@ -33,12 +35,9 @@ use crate::{
self, AsyncReadRequest, AsyncWriteRequest, CacheState, PageCache as _,
PageCacheReadInfo, PageHandle, PageIOObservable, PageStore,
},
utils::{
page_cache_logger::PageCacheLogger,
page_prefetch::{ReadaheadPrefetcher, StridedPrefetcher},
},
utils::page_prefetch::{ReadaheadPrefetcher, StridedPrefetcher},
},
kcmdline,
kcmdline, new_legacy_data_capture_file,
prelude::*,
vm::vmo::{Pager, Vmo, VmoFlags, VmoOptions, get_page_idx_range},
};
Expand Down Expand Up @@ -88,10 +87,10 @@ fn get_prefetch_policy() -> PrefetchPolicy {
}

/// Retrieves whether cache hits and misses should be logged based on the kernel command-line argument
/// "page_cache.log_hits_misses". The options are: `true` or `false`.
fn get_log_hits_misses() -> bool {
/// "page_cache.capture_accesses". The options are: `true` or `false`.
fn get_capture_accesses() -> bool {
kcmdline::get_kernel_cmd_line()
.and_then(|cl| cl.get_module_arg_by_name("page_cache", "log_hits_misses"))
.and_then(|cl| cl.get_module_arg_by_name("page_cache", "capture_accesses"))
.unwrap_or(false)
}

Expand Down Expand Up @@ -136,6 +135,12 @@ impl PageCache {
Ok(())
}

/// Returns an identifier for this cache instance, matching the `cache_id` field in
/// `PageCacheReadInfo` events.
pub fn cache_id(&self) -> usize {
Arc::as_ptr(&self.manager) as usize
}

/// Returns the Vmo object.
// TODO: The capability is too high, restrict it to eliminate the possibility of misuse.
// For example, the `resize` api should be forbidden.
Expand Down Expand Up @@ -510,7 +515,6 @@ impl PageCacheManagerInner {
}

impl PageCacheManager {
#[track_caller]
pub fn spawn(backend: Weak<dyn PageStore>, policy: PrefetchPolicy) -> Result<Arc<Self>> {
let policy = if Task::current().is_none() {
PrefetchPolicy::None
Expand Down Expand Up @@ -550,8 +554,25 @@ impl PageCacheManager {
let mut inner = server.inner.lock();
let inner = inner.deref_mut();

let cache_pages = inner.pages.len() as u64;

// If the page is not in the cache, issue a request.
if inner.pages.get(&idx).is_none() {
if inner.page_cache_read_info_producer.is_none() {
inner.page_cache_read_info_producer =
Some(server.page_cache_read_info_oqueue().attach_producer()?);
}
inner
.page_cache_read_info_producer
.as_ref()
.unwrap()
.produce(PageCacheReadInfo::new(
idx as u64,
CacheState::Prefetch,
server.as_ref() as *const _ as usize,
server.backend()?.npages()? as u64,
cache_pages,
));
inner.outstanding_requests.request_async(
&mut inner.pages,
&server.backend()?,
Expand All @@ -564,8 +585,27 @@ impl PageCacheManager {
});

// TODO(arthurp, #120): This is never shutdown even if the cache is.
if get_log_hits_misses() {
PageCacheLogger::spawn(server.page_cache_read_info_oqueue())?;
if get_capture_accesses() {
static PAGE_CACHE_LOG_FILE: Mutex<Option<Arc<dyn DataCaptureFile<PageCacheReadInfo>>>> =
Mutex::new(None);

let file = {
let mut file_guard = PAGE_CACHE_LOG_FILE.lock();
if file_guard.is_none() {
*file_guard = Some(new_legacy_data_capture_file(FileDescriptor {
length: 200 * 1024 * 1024,
path: path!(page_cache.read_info),
}));
}
file_guard.as_ref().unwrap().clone()
};

file.register_observer(mariposa_data_capture::legacy::ObserverRegistration {
observer: server
.page_cache_read_info_oqueue()
.attach_strong_observer()?,
})?;
file.start()?;
}

if policy != PrefetchPolicy::Builtin && policy != PrefetchPolicy::None {
Expand Down Expand Up @@ -596,29 +636,16 @@ impl PageCacheManager {
let page_idx_range = get_page_idx_range(&range);

let mut consumers = Vec::new();
// TODO(arthurp): This locks the entire cache. That's probably a performance problem.
{
let mut inner = self.inner.lock();
let pages = &mut inner.pages;
let backend = self.backend()?;
let backend_npages = backend.npages()?;
for idx in page_idx_range.start..page_idx_range.end {
if let Some(page) = pages.peek(&idx) {
if page.load_state() == PageState::Dirty && idx < backend_npages {
let (reply_handle, reply_consumer) = ReplyQueue::new_pair()?;
backend.write_page_async(AsyncWriteRequest {
handle: PageHandle {
idx,
frame: page.clone(),
},
reply_handle: Some(reply_handle),
})?;
consumers.push(reply_consumer);
}
}
let mut inner = self.inner.lock();
let pages = &mut inner.pages;
let backend = self.backend()?;
for idx in page_idx_range.start..page_idx_range.end {
if let Some(reply_consumer) = flush_page(pages, &backend, idx)? {
consumers.push(reply_consumer);
}
}

// TODO(arthurp): This waits for the flush with the lock held.
for consumer in consumers {
let PageHandle { idx: _, frame } = consumer.consume();
frame.store_state(PageState::UpToDate);
Expand All @@ -635,8 +662,8 @@ impl PageCacheManager {

let frame = {
let backend = self.backend()?;
let mut inner = self.inner.lock();
let inner = inner.deref_mut();
let mut inner_guard = self.inner.lock();
let inner = inner_guard.deref_mut();

// Lazily initialize page_cache_read_info_producer
if inner.page_cache_read_info_producer.is_none() {
Expand All @@ -654,34 +681,45 @@ impl PageCacheManager {
// 1. The requested page is ready for read in page cache.
// 2. The requested page is currently being read (generally due to a prefetch).
// 3. The requested page is on disk, need a sync read operation here.
let store_size = backend.npages()? as u64;
let cache_pages = inner.pages.len() as u64;
let frame = if let Some(page) = inner.pages.get(&idx) {
// Cond 1 & 2.
if let PageState::Uninit = page.load_state() {
// Cond 2: We should wait for the previous readahead.
// If there is no previous readahead, an error must have occurred somewhere.
page_cache_read_info_producer.produce(PageCacheReadInfo {
idx,
cache_state: CacheState::Pending,
});
page_cache_read_info_producer.produce(PageCacheReadInfo::new(
idx as u64,
CacheState::Pending,
self as *const _ as usize,
store_size,
cache_pages,
));
assert!(inner.outstanding_requests.has_requests());
inner
.outstanding_requests
.wait_for_requests(&mut inner.pages);
inner.pages.get(&idx).context(UNREACHABLE_SNAFU)?.clone()
} else {
// Cond 1.
page_cache_read_info_producer.produce(PageCacheReadInfo {
idx,
cache_state: CacheState::Hit,
});
page_cache_read_info_producer.produce(PageCacheReadInfo::new(
idx as u64,
CacheState::Hit,
self as *const _ as usize,
store_size,
cache_pages,
));
page.clone()
}
} else {
// Cond 3.
page_cache_read_info_producer.produce(PageCacheReadInfo {
idx,
cache_state: CacheState::Miss,
});
page_cache_read_info_producer.produce(PageCacheReadInfo::new(
idx as u64,
CacheState::Miss,
self as *const _ as usize,
store_size,
cache_pages,
));
// Conducts the sync read operation.
let page = if idx < backend.npages()? {
let page = CachePage::alloc_uninit()?;
Expand Down Expand Up @@ -711,6 +749,33 @@ impl PageCacheManager {
}
}

/// Start the flush of a page to storage.
///
/// This places the reply consumer into `consumers`. The caller should wait on it appropriately.
fn flush_page(
pages: &mut LruCache<usize, Frame<CachePageMeta>>,
backend: &Arc<dyn PageStore>,
idx: usize,
) -> Result<Option<Box<dyn Consumer<PageHandle>>>> {
let backend_npages = backend.npages()?;
if let Some(page) = pages.peek(&idx)
&& page.load_state() == PageState::Dirty
{
assert!(idx < backend_npages);
let (reply_handle, reply_consumer) = ReplyQueue::new_pair()?;
backend.write_page_async(AsyncWriteRequest {
handle: PageHandle {
idx,
frame: page.clone(),
},
reply_handle: Some(reply_handle),
})?;
Ok(Some(reply_consumer))
} else {
Ok(None)
}
}

impl Debug for PageCacheManager {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
f.debug_struct("PageCacheManager")
Expand Down
Loading
Loading