Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Mea (Make Easy Async) is a runtime-agnostic library providing essential synchron
* [**OnceMap**](https://docs.rs/mea/*/mea/once/struct.OnceMap.html): A hash map that runs computation only once for each key and stores the result.
* [**RwLock**](https://docs.rs/mea/*/mea/rwlock/struct.RwLock.html): A reader-writer lock that allows multiple readers or a single writer at a time.
* [**Semaphore**](https://docs.rs/mea/*/mea/semaphore/struct.Semaphore.html): A synchronization primitive that controls access to a shared resource.
* [**ShutdownSend & ShutdownRecv**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals.
* [**ShutdownSend, ShutdownRecv & ShutdownWatch**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals.
* [**WaitGroup**](https://docs.rs/mea/*/mea/waitgroup/struct.WaitGroup.html): A synchronization primitive that allows waiting for multiple tasks to complete.
* [**atomicbox**](https://docs.rs/mea/*/mea/atomicbox/): A safe, owning version of AtomicPtr for heap-allocated data.
* [**broadcast**](https://docs.rs/mea/*/mea/broadcast/): A multi-producer, multi-consumer broadcast channel.
Expand Down
8 changes: 6 additions & 2 deletions mea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
//! * [`OnceMap`]: A hash map that runs computation only once for each key and stores the result.
//! * [`RwLock`]: A reader-writer lock that allows multiple readers or a single writer at a time
//! * [`Semaphore`]: A synchronization primitive that controls access to a shared resource
//! * [`ShutdownSend`] & [`ShutdownRecv`]: A composite synchronization primitive for managing
//! shutdown signals
//! * [`ShutdownSend`], [`ShutdownRecv`] & [`ShutdownWatch`]: A composite synchronization primitive
//! for managing shutdown signals
//! * [`WaitGroup`]: A synchronization primitive that allows waiting for multiple tasks to complete
//! * [`atomicbox`]: A safe, owning version of `AtomicPtr` for heap-allocated data.
//! * [`broadcast`]: A multi-producer, multi-consumer broadcast channel.
Expand Down Expand Up @@ -67,6 +67,7 @@
//! [`Semaphore`]: semaphore::Semaphore
//! [`ShutdownSend`]: shutdown::ShutdownSend
//! [`ShutdownRecv`]: shutdown::ShutdownRecv
//! [`ShutdownWatch`]: shutdown::ShutdownWatch
//! [`WaitGroup`]: waitgroup::WaitGroup

mod internal;
Expand Down Expand Up @@ -114,6 +115,7 @@ mod tests {
use crate::semaphore::Semaphore;
use crate::shutdown::ShutdownRecv;
use crate::shutdown::ShutdownSend;
use crate::shutdown::ShutdownWatch;
use crate::singleflight;
use crate::waitgroup::Wait;
use crate::waitgroup::WaitGroup;
Expand All @@ -131,6 +133,7 @@ mod tests {
do_assert_send_and_sync::<Semaphore>();
do_assert_send_and_sync::<ShutdownSend>();
do_assert_send_and_sync::<ShutdownRecv>();
do_assert_send_and_sync::<ShutdownWatch>();
do_assert_send_and_sync::<WaitGroup>();
do_assert_send_and_sync::<Mutex<i64>>();
do_assert_send_and_sync::<MutexGuard<'_, i64>>();
Expand Down Expand Up @@ -170,6 +173,7 @@ mod tests {
do_assert_unpin::<Semaphore>();
do_assert_unpin::<ShutdownSend>();
do_assert_unpin::<ShutdownRecv>();
do_assert_unpin::<ShutdownWatch>();
do_assert_unpin::<WaitGroup>();
do_assert_unpin::<Wait>();
do_assert_unpin::<Mutex<i64>>();
Expand Down
38 changes: 38 additions & 0 deletions mea/src/shutdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//! * [`ShutdownSend`] can send a shutdown signal, and can wait for all the tasks to finish.
//! * [`ShutdownRecv`] can wait for the shutdown signal, and should be dropped when the task is
//! done, which will notify the sender on all the tasks finished.
//! * [`ShutdownWatch`] can wait for the shutdown signal without blocking
//! [`ShutdownSend::await_shutdown`].
//!
//! Internally, the shutdown signal is implemented using a countdown latch, and the task completion
//! is tracked using a wait group. [`ShutdownSend`] is cloneable, allowing multiple sources to send
Expand Down Expand Up @@ -107,6 +109,42 @@ pub struct ShutdownRecv {
}

impl ShutdownRecv {
/// Returns a handle for watching the shutdown signal.
///
/// The returned handle does not block [`ShutdownSend::await_shutdown`].
pub fn watch(&self) -> ShutdownWatch {
ShutdownWatch {
latch: self.latch.clone(),
}
}

/// Returns whether the shutdown signal has been received.
pub fn is_shutdown_now(&self) -> bool {
self.latch.try_wait().is_ok()
}

/// Returns a future that resolves when the shutdown signal is received.
pub async fn is_shutdown(&self) {
self.latch.wait().await;
}

/// Returns an owned future that resolves when the shutdown signal is received.
///
/// The returned future has no lifetime constraints.
pub fn is_shutdown_owned(&self) -> impl Future<Output = ()> + 'static {
self.latch.clone().wait_owned()
}
}

/// A handle for watching shutdown signals without participating in shutdown completion.
///
/// See the [module level documentation](self) for more.
#[derive(Debug, Clone)]
pub struct ShutdownWatch {
latch: Arc<Latch>,
}

impl ShutdownWatch {
/// Returns whether the shutdown signal has been received.
pub fn is_shutdown_now(&self) -> bool {
self.latch.try_wait().is_ok()
Expand Down
23 changes: 23 additions & 0 deletions mea/src/shutdown/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,26 @@ fn test_is_shutdown_owned_not_capture_self() {
tx.shutdown();
pollster::block_on(tx.await_shutdown());
}

#[test]
fn test_watch_does_not_block_shutdown() {
let (tx, rx) = new_pair();
let watch = rx.watch();
drop(rx);

tx.shutdown();
assert!(watch.is_shutdown_now());
pollster::block_on(tx.await_shutdown());
}

#[test]
fn test_watch_is_shutdown() {
let (tx, rx) = new_pair();
let watch = rx.watch();
let handle = test_runtime().spawn(async move { watch.is_shutdown().await });
drop(rx);

tx.shutdown();
pollster::block_on(tx.await_shutdown());
pollster::block_on(handle).unwrap();
}