From 366be547092a717dcf96d3ab26666548c280ac55 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 28 May 2026 12:03:52 +0800 Subject: [PATCH] Add shutdown watch handle --- README.md | 2 +- mea/src/lib.rs | 8 ++++++-- mea/src/shutdown/mod.rs | 38 ++++++++++++++++++++++++++++++++++++++ mea/src/shutdown/tests.rs | 23 +++++++++++++++++++++++ 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bb27b48..b1eab3b 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Mea (Make Easy Async) is a runtime-agnostic library providing essential synchron * [**OnceMap**](https://docs.rs/mea/*/mea/once/struct.OnceMap.html): A hash map that runs computation only once for each key and stores the result. * [**RwLock**](https://docs.rs/mea/*/mea/rwlock/struct.RwLock.html): A reader-writer lock that allows multiple readers or a single writer at a time. * [**Semaphore**](https://docs.rs/mea/*/mea/semaphore/struct.Semaphore.html): A synchronization primitive that controls access to a shared resource. -* [**ShutdownSend & ShutdownRecv**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals. +* [**ShutdownSend, ShutdownRecv & ShutdownWatch**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals. * [**WaitGroup**](https://docs.rs/mea/*/mea/waitgroup/struct.WaitGroup.html): A synchronization primitive that allows waiting for multiple tasks to complete. * [**atomicbox**](https://docs.rs/mea/*/mea/atomicbox/): A safe, owning version of AtomicPtr for heap-allocated data. * [**broadcast**](https://docs.rs/mea/*/mea/broadcast/): A multi-producer, multi-consumer broadcast channel. diff --git a/mea/src/lib.rs b/mea/src/lib.rs index 969c5d4..7a4f1a6 100644 --- a/mea/src/lib.rs +++ b/mea/src/lib.rs @@ -32,8 +32,8 @@ //! * [`OnceMap`]: A hash map that runs computation only once for each key and stores the result. //! * [`RwLock`]: A reader-writer lock that allows multiple readers or a single writer at a time //! * [`Semaphore`]: A synchronization primitive that controls access to a shared resource -//! * [`ShutdownSend`] & [`ShutdownRecv`]: A composite synchronization primitive for managing -//! shutdown signals +//! * [`ShutdownSend`], [`ShutdownRecv`] & [`ShutdownWatch`]: A composite synchronization primitive +//! for managing shutdown signals //! * [`WaitGroup`]: A synchronization primitive that allows waiting for multiple tasks to complete //! * [`atomicbox`]: A safe, owning version of `AtomicPtr` for heap-allocated data. //! * [`broadcast`]: A multi-producer, multi-consumer broadcast channel. @@ -67,6 +67,7 @@ //! [`Semaphore`]: semaphore::Semaphore //! [`ShutdownSend`]: shutdown::ShutdownSend //! [`ShutdownRecv`]: shutdown::ShutdownRecv +//! [`ShutdownWatch`]: shutdown::ShutdownWatch //! [`WaitGroup`]: waitgroup::WaitGroup mod internal; @@ -114,6 +115,7 @@ mod tests { use crate::semaphore::Semaphore; use crate::shutdown::ShutdownRecv; use crate::shutdown::ShutdownSend; + use crate::shutdown::ShutdownWatch; use crate::singleflight; use crate::waitgroup::Wait; use crate::waitgroup::WaitGroup; @@ -131,6 +133,7 @@ mod tests { do_assert_send_and_sync::(); do_assert_send_and_sync::(); do_assert_send_and_sync::(); + do_assert_send_and_sync::(); do_assert_send_and_sync::(); do_assert_send_and_sync::>(); do_assert_send_and_sync::>(); @@ -170,6 +173,7 @@ mod tests { do_assert_unpin::(); do_assert_unpin::(); do_assert_unpin::(); + do_assert_unpin::(); do_assert_unpin::(); do_assert_unpin::(); do_assert_unpin::>(); diff --git a/mea/src/shutdown/mod.rs b/mea/src/shutdown/mod.rs index a77eb85..0e02b90 100644 --- a/mea/src/shutdown/mod.rs +++ b/mea/src/shutdown/mod.rs @@ -19,6 +19,8 @@ //! * [`ShutdownSend`] can send a shutdown signal, and can wait for all the tasks to finish. //! * [`ShutdownRecv`] can wait for the shutdown signal, and should be dropped when the task is //! done, which will notify the sender on all the tasks finished. +//! * [`ShutdownWatch`] can wait for the shutdown signal without blocking +//! [`ShutdownSend::await_shutdown`]. //! //! Internally, the shutdown signal is implemented using a countdown latch, and the task completion //! is tracked using a wait group. [`ShutdownSend`] is cloneable, allowing multiple sources to send @@ -107,6 +109,42 @@ pub struct ShutdownRecv { } impl ShutdownRecv { + /// Returns a handle for watching the shutdown signal. + /// + /// The returned handle does not block [`ShutdownSend::await_shutdown`]. + pub fn watch(&self) -> ShutdownWatch { + ShutdownWatch { + latch: self.latch.clone(), + } + } + + /// Returns whether the shutdown signal has been received. + pub fn is_shutdown_now(&self) -> bool { + self.latch.try_wait().is_ok() + } + + /// Returns a future that resolves when the shutdown signal is received. + pub async fn is_shutdown(&self) { + self.latch.wait().await; + } + + /// Returns an owned future that resolves when the shutdown signal is received. + /// + /// The returned future has no lifetime constraints. + pub fn is_shutdown_owned(&self) -> impl Future + 'static { + self.latch.clone().wait_owned() + } +} + +/// A handle for watching shutdown signals without participating in shutdown completion. +/// +/// See the [module level documentation](self) for more. +#[derive(Debug, Clone)] +pub struct ShutdownWatch { + latch: Arc, +} + +impl ShutdownWatch { /// Returns whether the shutdown signal has been received. pub fn is_shutdown_now(&self) -> bool { self.latch.try_wait().is_ok() diff --git a/mea/src/shutdown/tests.rs b/mea/src/shutdown/tests.rs index f41e8a5..3e47a57 100644 --- a/mea/src/shutdown/tests.rs +++ b/mea/src/shutdown/tests.rs @@ -82,3 +82,26 @@ fn test_is_shutdown_owned_not_capture_self() { tx.shutdown(); pollster::block_on(tx.await_shutdown()); } + +#[test] +fn test_watch_does_not_block_shutdown() { + let (tx, rx) = new_pair(); + let watch = rx.watch(); + drop(rx); + + tx.shutdown(); + assert!(watch.is_shutdown_now()); + pollster::block_on(tx.await_shutdown()); +} + +#[test] +fn test_watch_is_shutdown() { + let (tx, rx) = new_pair(); + let watch = rx.watch(); + let handle = test_runtime().spawn(async move { watch.is_shutdown().await }); + drop(rx); + + tx.shutdown(); + pollster::block_on(tx.await_shutdown()); + pollster::block_on(handle).unwrap(); +}