From 6e01f330aba8f50275230908b31d94c4f2a16acb Mon Sep 17 00:00:00 2001 From: Xmchx <1394466835@qq.com> Date: Tue, 17 Jun 2025 17:35:44 +0800 Subject: [PATCH 1/3] feat: impl time-wheel based timer Signed-off-by: Xmchx <1394466835@qq.com> --- fastimer-driver/src/lib.rs | 2 + fastimer-driver/src/wheel.rs | 210 +++++++++++++++++++++++++++ fastimer-driver/tests/integration.rs | 33 +++++ 3 files changed, 245 insertions(+) create mode 100644 fastimer-driver/src/wheel.rs diff --git a/fastimer-driver/src/lib.rs b/fastimer-driver/src/lib.rs index 95131dd..1d5815f 100644 --- a/fastimer-driver/src/lib.rs +++ b/fastimer-driver/src/lib.rs @@ -36,6 +36,8 @@ use parking::Unparker; mod heap; pub use heap::*; +mod wheel; +pub use wheel::*; #[derive(Debug)] struct TimeEntry { diff --git a/fastimer-driver/src/wheel.rs b/fastimer-driver/src/wheel.rs new file mode 100644 index 0000000..f2c41ca --- /dev/null +++ b/fastimer-driver/src/wheel.rs @@ -0,0 +1,210 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::ops::ControlFlow; +use std::sync::Arc; +use std::sync::atomic; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; +use std::time::Duration; +use std::time::Instant; + +use crossbeam_queue::SegQueue; +use parking::Parker; +use parking::Unparker; + +use crate::TimeContext; +use crate::TimeDriverShutdown; +use crate::TimeEntry; + +/// Returns a new time driver, its time context and the shutdown handle. +pub fn time_wheel_driver() -> (TimeWheelTimeDriver, TimeContext, TimeDriverShutdown) { + let (parker, unparker) = parking::pair(); + let wheel = TimeWheel::new(); + let inbounds = Arc::new(SegQueue::new()); + let shutdown = Arc::new(AtomicBool::new(false)); + + let driver = TimeWheelTimeDriver { + parker, + unparker, + wheel, + inbounds, + shutdown, + last_tick: Instant::now(), + }; + + let context = TimeContext { + unparker: driver.unparker.clone(), + inbounds: driver.inbounds.clone(), + }; + + let shutdown = TimeDriverShutdown { + unparker: driver.unparker.clone(), + shutdown: driver.shutdown.clone(), + }; + + (driver, context, shutdown) +} + +const WHEEL_BITS: u64 = 6; +const WHEEL_SIZE: u64 = 1 << WHEEL_BITS; +const WHEEL_MASK: u64 = WHEEL_SIZE - 1; +const WHEEL_LEVELS: usize = 4; + +type Slot = VecDeque; + +#[derive(Debug)] + +/// The time wheel structure for hierarchical timer management +struct TimeWheel { + /// Multi-level time wheel, each level has 64 slots + wheels: [Vec; WHEEL_LEVELS], + /// The current tick count + current_tick: AtomicU64, + /// Duration of each tick in milliseconds + tick_duration: Duration, +} + +impl TimeWheel { + fn new() -> Self { + let wheels = [(); WHEEL_LEVELS].map(|_| (0..WHEEL_SIZE).map(|_| VecDeque::new()).collect()); + + Self { + wheels, + current_tick: AtomicU64::new(0), + tick_duration: Duration::from_millis(1), + } + } + + /// Calculate which wheel level and slot a timer should go into. + fn calculate_slot(&self, when: Instant) -> Option<(usize, usize)> { + let now = Instant::now(); + if when <= now { + return None; + } + + let delta = when.duration_since(now); + let ticks = delta.as_millis() as u64; + let current_tick = self.current_tick.load(atomic::Ordering::Acquire); + let target_tick = current_tick + ticks; + + // Determine which wheel level to place the timer in. + for level in 0..WHEEL_LEVELS { + let shift = WHEEL_BITS * level as u64; + let mask = WHEEL_MASK << shift; + + if (target_tick & mask) != (current_tick & mask) { + let slot_index = ((target_tick >> shift) & WHEEL_MASK) as usize; + return Some((level, slot_index)); + } + } + + // If the delay exceeds all wheel levels, place in the last slot of the highest level. + Some((WHEEL_LEVELS - 1, (WHEEL_SIZE - 1) as usize)) + } + + /// Add a timer entry into the time wheel. + fn add_timer(&mut self, entry: TimeEntry) { + if let Some((level, slot)) = self.calculate_slot(entry.when) { + self.wheels[level][slot].push_back(entry); + } else { + // If already expired, wake immediately. + entry.waker.wake(); + } + } + + /// Advance the time wheel by one tick and return all expired timers. + fn advance_tick(&mut self) -> Vec { + let mut expired = Vec::new(); + let current_tick = self.current_tick.fetch_add(1, atomic::Ordering::AcqRel); + + // Check the current slot in the first-level wheel. + let slot_index = (current_tick & WHEEL_MASK) as usize; + expired.extend(self.wheels[0][slot_index].drain(..)); + + // Check if timers from higher-level wheels need to be cascaded down. + for level in 1..WHEEL_LEVELS { + let shift = WHEEL_BITS * level as u64; + if (current_tick & ((1 << shift) - 1)) == 0 { + let slot_index = ((current_tick >> shift) & WHEEL_MASK) as usize; + let timers: Vec<_> = self.wheels[level][slot_index].drain(..).collect(); + + for timer in timers { + self.add_timer(timer); + } + } + } + + expired + } +} + +/// A time-wheel based time driver that drives registered timers. +#[derive(Debug)] +pub struct TimeWheelTimeDriver { + parker: Parker, + unparker: Unparker, + wheel: TimeWheel, + inbounds: Arc>, + shutdown: Arc, + last_tick: Instant, +} + +impl TimeWheelTimeDriver { + /// Drives the timers and returns `true` if the driver has been shut down. + pub fn turn(&mut self) -> ControlFlow<()> { + if self.shutdown.load(atomic::Ordering::Acquire) { + return ControlFlow::Break(()); + } + + let now = Instant::now(); + + while let Some(entry) = self.inbounds.pop() { + self.wheel.add_timer(entry); + } + + let elapsed = now.duration_since(self.last_tick); + let ticks_to_advance = (elapsed.as_millis() as u64).max(1); + + let mut all_expired = Vec::new(); + for _ in 0..ticks_to_advance { + let expired = self.wheel.advance_tick(); + all_expired.extend(expired); + } + + for entry in all_expired { + if entry.when <= now { + entry.waker.wake(); + } else { + self.wheel.add_timer(entry); + } + } + + self.last_tick = now; + + let next_tick_time = self.last_tick + self.wheel.tick_duration; + let sleep_duration = next_tick_time.saturating_duration_since(Instant::now()); + + if sleep_duration > Duration::ZERO { + self.parker.park_timeout(sleep_duration); + } + + if self.shutdown.load(atomic::Ordering::Acquire) { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } +} diff --git a/fastimer-driver/tests/integration.rs b/fastimer-driver/tests/integration.rs index 0b40cbf..2365b69 100644 --- a/fastimer-driver/tests/integration.rs +++ b/fastimer-driver/tests/integration.rs @@ -17,6 +17,7 @@ use std::time::Instant; use fastimer::make_instant_from_now; use fastimer_driver::binary_heap_driver; +use fastimer_driver::time_wheel_driver; #[track_caller] fn assert_duration_eq(actual: Duration, expected: Duration) { @@ -25,6 +26,38 @@ fn assert_duration_eq(actual: Duration, expected: Duration) { } } +#[test] +fn test_time_wheel_driver() { + let (mut driver, context, shutdown) = time_wheel_driver(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + loop { + if driver.turn().is_break() { + tx.send(()).unwrap(); + break; + } + } + }); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + let now = Instant::now(); + + context.delay(Duration::from_secs(2)).await; + assert_duration_eq(now.elapsed(), Duration::from_secs(2)); + + let now = Instant::now(); + let future = make_instant_from_now(Duration::from_secs(3)); + let f1 = context.delay_until(future); + let f2 = context.delay_until(future); + tokio::join!(f1, f2); + assert_duration_eq(now.elapsed(), Duration::from_secs(3)); + + shutdown.shutdown(); + }); + rx.recv_timeout(Duration::from_secs(1)).unwrap(); +} + #[test] fn test_binary_heap_driver() { let (mut driver, context, shutdown) = binary_heap_driver(); From 7621098c90d737536cfc7027e2140f50f3ee413b Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 7 Oct 2025 00:06:34 +0800 Subject: [PATCH 2/3] merge main Signed-off-by: tison --- Cargo.lock | 38 +++ Cargo.toml | 11 +- fastimer-core/README.md | 2 +- fastimer-timewheel/Cargo.toml | 38 +++ fastimer-timewheel/README.md | 29 +++ fastimer-timewheel/src/lib.rs | 312 ++++++++++++++++++++++++ fastimer-timewheel/tests/integration.rs | 60 +++++ 7 files changed, 488 insertions(+), 2 deletions(-) create mode 100644 fastimer-timewheel/Cargo.toml create mode 100644 fastimer-timewheel/README.md create mode 100644 fastimer-timewheel/src/lib.rs create mode 100644 fastimer-timewheel/tests/integration.rs diff --git a/Cargo.lock b/Cargo.lock index 4697a02..80c53b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,6 +72,12 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.4.0" @@ -166,6 +172,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "env_home" version = "0.1.0" @@ -209,6 +230,17 @@ dependencies = [ name = "fastimer-core" version = "0.1.0" +[[package]] +name = "fastimer-timewheel" +version = "0.1.0" +dependencies = [ + "atomic-waker", + "crossbeam-queue", + "fastimer-core", + "parking", + "tokio", +] + [[package]] name = "fastimer-tokio" version = "0.10.0" @@ -427,6 +459,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" diff --git a/Cargo.toml b/Cargo.toml index d8aa134..e5d2a71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,13 @@ # limitations under the License. [workspace] -members = ["fastimer", "fastimer-core", "fastimer-tokio", "xtask"] +members = [ + "fastimer", + "fastimer-core", + "fastimer-timewheel", + "fastimer-tokio", + "xtask", +] resolver = "2" [workspace.package] @@ -30,9 +36,12 @@ fastimer-core = { version = "0.1.0", path = "fastimer-core" } fastimer-tokio = { version = "0.10.0", path = "fastimer-tokio" } # Crates.io dependencies +atomic-waker = { version = "1.1.2" } clap = { version = "4.5.20", features = ["derive"] } +crossbeam-queue = { version = "0.3.12" } log = { version = "0.4.28" } logforth = { version = "0.28.1" } +parking = { version = "2.2.1" } pin-project = { version = "1.1.10" } tokio = { version = "1.47.1" } which = { version = "8.0.0" } diff --git a/fastimer-core/README.md b/fastimer-core/README.md index b575a1e..d34851d 100644 --- a/fastimer-core/README.md +++ b/fastimer-core/README.md @@ -12,7 +12,7 @@ ## Overview -This crate provides core APIs. +This crate provides core APIs for fastimer. ## Documentation diff --git a/fastimer-timewheel/Cargo.toml b/fastimer-timewheel/Cargo.toml new file mode 100644 index 0000000..4182465 --- /dev/null +++ b/fastimer-timewheel/Cargo.toml @@ -0,0 +1,38 @@ +# Copyright 2024 FastLabs Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "fastimer-timewheel" +version = "0.1.0" + +description = "Timewheel based implementation for fastimer" +readme = "README.md" + +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +atomic-waker = { workspace = true } +crossbeam-queue = { workspace = true } +fastimer-core = { workspace = true } +parking = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } + +[lints] +workspace = true diff --git a/fastimer-timewheel/README.md b/fastimer-timewheel/README.md new file mode 100644 index 0000000..e726d59 --- /dev/null +++ b/fastimer-timewheel/README.md @@ -0,0 +1,29 @@ +# Fastimer Timewheel Implementation + +[![Crates.io][crates-badge]][crates-url] +[![Documentation][docs-badge]][docs-url] +[![MSRV 1.85][msrv-badge]](https://www.whatrustisit.com) + +[crates-badge]: https://img.shields.io/crates/v/fastimer-timewheel.svg +[crates-url]: https://crates.io/crates/fastimer-timewheel +[docs-badge]: https://docs.rs/fastimer-timewheel/badge.svg +[msrv-badge]: https://img.shields.io/badge/MSRV-1.85-green?logo=rust +[docs-url]: https://docs.rs/fastimer-timewheel + +## Overview + +This crate provides timewheel based driver implementation. + +## Documentation + +Read the online documents at https://docs.rs/fastimer-timewheel. + +## Minimum Supported Rust Version (MSRV) + +This crate is built against the latest stable release, and its minimum supported rustc version is 1.85.0. + +The policy is that the minimum Rust version required to use this crate can be increased in minor version updates. For example, if Fastimer 1.0 requires Rust 1.20.0, then Fastimer 1.0.z for all values of z will also require Rust 1.20.0 or newer. However, Fastimer 1.y for y > 0 may require a newer minimum version of Rust. + +## License + +This project is licensed under [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0). diff --git a/fastimer-timewheel/src/lib.rs b/fastimer-timewheel/src/lib.rs new file mode 100644 index 0000000..44d2578 --- /dev/null +++ b/fastimer-timewheel/src/lib.rs @@ -0,0 +1,312 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A time driver based on a hierarchical time wheel. + +use std::cmp; +use std::collections::VecDeque; +use std::ops::ControlFlow; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; + +use atomic_waker::AtomicWaker; +use crossbeam_queue::SegQueue; +use fastimer_core::make_instant_from_now; +use parking::Parker; +use parking::Unparker; + +#[derive(Debug)] +struct TimeEntry { + when: Instant, + waker: Arc, +} + +impl PartialEq for TimeEntry { + fn eq(&self, other: &Self) -> bool { + self.when == other.when + } +} + +impl Eq for TimeEntry {} + +impl PartialOrd for TimeEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimeEntry { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.when.cmp(&other.when) + } +} + +/// Future returned by [`delay`] and [`delay_until`]. +/// +/// [`delay`]: TimeContext::delay +/// [`delay_until`]: TimeContext::delay_until +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Delay { + when: Instant, + waker: Arc, +} + +impl Future for Delay { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if Instant::now() >= self.when { + self.waker.take(); + Poll::Ready(()) + } else { + self.waker.register(cx.waker()); + Poll::Pending + } + } +} + +impl Drop for Delay { + fn drop(&mut self) { + self.waker.take(); + } +} + +/// A time context for creating [`Delay`]s. +#[derive(Debug, Clone)] +pub struct TimeContext { + unparker: Unparker, + inbounds: Arc>, +} + +impl TimeContext { + /// Returns a future that completes after the specified duration. + pub fn delay(&self, dur: Duration) -> Delay { + self.delay_until(make_instant_from_now(dur)) + } + + /// Returns a future that completes at the specified instant. + pub fn delay_until(&self, when: Instant) -> Delay { + let waker = Arc::new(AtomicWaker::new()); + let delay = Delay { + when, + waker: waker.clone(), + }; + self.inbounds.push(TimeEntry { when, waker }); + self.unparker.unpark(); + delay + } +} + +/// A handle to shut down the time driver. +#[derive(Debug, Clone)] +pub struct TimeDriverShutdown { + unparker: Unparker, + shutdown: Arc, +} + +impl TimeDriverShutdown { + /// Shuts down the time driver. + pub fn shutdown(&self) { + self.shutdown.store(true, atomic::Ordering::Release); + self.unparker.unpark(); + } +} + +/// Returns a new time driver, its time context and the shutdown handle. +pub fn time_wheel_driver() -> (TimeWheelTimeDriver, TimeContext, TimeDriverShutdown) { + let (parker, unparker) = parking::pair(); + let wheel = TimeWheel::new(); + let inbounds = Arc::new(SegQueue::new()); + let shutdown = Arc::new(AtomicBool::new(false)); + + let driver = TimeWheelTimeDriver { + parker, + unparker, + wheel, + inbounds, + shutdown, + last_tick: Instant::now(), + }; + + let context = TimeContext { + unparker: driver.unparker.clone(), + inbounds: driver.inbounds.clone(), + }; + + let shutdown = TimeDriverShutdown { + unparker: driver.unparker.clone(), + shutdown: driver.shutdown.clone(), + }; + + (driver, context, shutdown) +} + +const WHEEL_BITS: u64 = 6; +const WHEEL_SIZE: u64 = 1 << WHEEL_BITS; +const WHEEL_MASK: u64 = WHEEL_SIZE - 1; +const WHEEL_LEVELS: usize = 4; + +type Slot = VecDeque; + +#[derive(Debug)] + +/// The time wheel structure for hierarchical timer management +struct TimeWheel { + /// Multi-level time wheel, each level has 64 slots + wheels: [Vec; WHEEL_LEVELS], + /// The current tick count + current_tick: AtomicU64, + /// Duration of each tick in milliseconds + tick_duration: Duration, +} + +impl TimeWheel { + fn new() -> Self { + let wheels = [(); WHEEL_LEVELS].map(|_| (0..WHEEL_SIZE).map(|_| VecDeque::new()).collect()); + + Self { + wheels, + current_tick: AtomicU64::new(0), + tick_duration: Duration::from_millis(1), + } + } + + /// Calculate which wheel level and slot a timer should go into. + fn calculate_slot(&self, when: Instant) -> Option<(usize, usize)> { + let now = Instant::now(); + if when <= now { + return None; + } + + let delta = when.duration_since(now); + let ticks = delta.as_millis() as u64; + let current_tick = self.current_tick.load(atomic::Ordering::Acquire); + let target_tick = current_tick + ticks; + + // Determine which wheel level to place the timer in. + for level in 0..WHEEL_LEVELS { + let shift = WHEEL_BITS * level as u64; + let mask = WHEEL_MASK << shift; + + if (target_tick & mask) != (current_tick & mask) { + let slot_index = ((target_tick >> shift) & WHEEL_MASK) as usize; + return Some((level, slot_index)); + } + } + + // If the delay exceeds all wheel levels, place in the last slot of the highest level. + Some((WHEEL_LEVELS - 1, (WHEEL_SIZE - 1) as usize)) + } + + /// Add a timer entry into the time wheel. + fn add_timer(&mut self, entry: TimeEntry) { + if let Some((level, slot)) = self.calculate_slot(entry.when) { + self.wheels[level][slot].push_back(entry); + } else { + // If already expired, wake immediately. + entry.waker.wake(); + } + } + + /// Advance the time wheel by one tick and return all expired timers. + fn advance_tick(&mut self) -> Vec { + let mut expired = Vec::new(); + let current_tick = self.current_tick.fetch_add(1, atomic::Ordering::AcqRel); + + // Check the current slot in the first-level wheel. + let slot_index = (current_tick & WHEEL_MASK) as usize; + expired.extend(self.wheels[0][slot_index].drain(..)); + + // Check if timers from higher-level wheels need to be cascaded down. + for level in 1..WHEEL_LEVELS { + let shift = WHEEL_BITS * level as u64; + if (current_tick & ((1 << shift) - 1)) == 0 { + let slot_index = ((current_tick >> shift) & WHEEL_MASK) as usize; + let timers: Vec<_> = self.wheels[level][slot_index].drain(..).collect(); + + for timer in timers { + self.add_timer(timer); + } + } + } + + expired + } +} + +/// A time-wheel based time driver that drives registered timers. +#[derive(Debug)] +pub struct TimeWheelTimeDriver { + parker: Parker, + unparker: Unparker, + wheel: TimeWheel, + inbounds: Arc>, + shutdown: Arc, + last_tick: Instant, +} + +impl TimeWheelTimeDriver { + /// Drives the timers and returns `true` if the driver has been shut down. + pub fn turn(&mut self) -> ControlFlow<()> { + if self.shutdown.load(atomic::Ordering::Acquire) { + return ControlFlow::Break(()); + } + + let now = Instant::now(); + + while let Some(entry) = self.inbounds.pop() { + self.wheel.add_timer(entry); + } + + let elapsed = now.duration_since(self.last_tick); + let ticks_to_advance = (elapsed.as_millis() as u64).max(1); + + let mut all_expired = Vec::new(); + for _ in 0..ticks_to_advance { + let expired = self.wheel.advance_tick(); + all_expired.extend(expired); + } + + for entry in all_expired { + if entry.when <= now { + entry.waker.wake(); + } else { + self.wheel.add_timer(entry); + } + } + + self.last_tick = now; + + let next_tick_time = self.last_tick + self.wheel.tick_duration; + let sleep_duration = next_tick_time.saturating_duration_since(Instant::now()); + + if sleep_duration > Duration::ZERO { + self.parker.park_timeout(sleep_duration); + } + + if self.shutdown.load(atomic::Ordering::Acquire) { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } +} diff --git a/fastimer-timewheel/tests/integration.rs b/fastimer-timewheel/tests/integration.rs new file mode 100644 index 0000000..021cdfc --- /dev/null +++ b/fastimer-timewheel/tests/integration.rs @@ -0,0 +1,60 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration tests for the time wheel driver. + +use std::time::Duration; +use std::time::Instant; + +use fastimer_core::make_instant_from_now; +use fastimer_timewheel::time_wheel_driver; + +#[track_caller] +fn assert_duration_eq(actual: Duration, expected: Duration) { + if expected.abs_diff(actual) > Duration::from_millis(250) { + panic!("expected: {expected:?}, actual: {actual:?}"); + } +} + +#[test] +fn test_time_wheel_driver() { + let (mut driver, context, shutdown) = time_wheel_driver(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + loop { + if driver.turn().is_break() { + tx.send(()).unwrap(); + break; + } + } + }); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + let now = Instant::now(); + + context.delay(Duration::from_secs(2)).await; + assert_duration_eq(now.elapsed(), Duration::from_secs(2)); + + let now = Instant::now(); + let future = make_instant_from_now(Duration::from_secs(3)); + let f1 = context.delay_until(future); + let f2 = context.delay_until(future); + tokio::join!(f1, f2); + assert_duration_eq(now.elapsed(), Duration::from_secs(3)); + + shutdown.shutdown(); + }); + rx.recv_timeout(Duration::from_secs(1)).unwrap(); +} From 1e6570b021559e8f5c5dfa4276fcd3a56a9a6297 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 7 Oct 2025 00:10:13 +0800 Subject: [PATCH 3/3] fixup Signed-off-by: tison --- fastimer-timewheel/src/lib.rs | 13 ++++--------- fastimer-timewheel/tests/integration.rs | 1 + 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/fastimer-timewheel/src/lib.rs b/fastimer-timewheel/src/lib.rs index 44d2578..42e8f53 100644 --- a/fastimer-timewheel/src/lib.rs +++ b/fastimer-timewheel/src/lib.rs @@ -29,7 +29,7 @@ use std::time::Instant; use atomic_waker::AtomicWaker; use crossbeam_queue::SegQueue; -use fastimer_core::make_instant_from_now; +use fastimer_core::MakeDelay; use parking::Parker; use parking::Unparker; @@ -97,14 +97,10 @@ pub struct TimeContext { inbounds: Arc>, } -impl TimeContext { - /// Returns a future that completes after the specified duration. - pub fn delay(&self, dur: Duration) -> Delay { - self.delay_until(make_instant_from_now(dur)) - } +impl MakeDelay for TimeContext { + type Delay = Delay; - /// Returns a future that completes at the specified instant. - pub fn delay_until(&self, when: Instant) -> Delay { + fn delay_until(&self, when: Instant) -> Self::Delay { let waker = Arc::new(AtomicWaker::new()); let delay = Delay { when, @@ -168,7 +164,6 @@ const WHEEL_LEVELS: usize = 4; type Slot = VecDeque; #[derive(Debug)] - /// The time wheel structure for hierarchical timer management struct TimeWheel { /// Multi-level time wheel, each level has 64 slots diff --git a/fastimer-timewheel/tests/integration.rs b/fastimer-timewheel/tests/integration.rs index 021cdfc..dc7a17e 100644 --- a/fastimer-timewheel/tests/integration.rs +++ b/fastimer-timewheel/tests/integration.rs @@ -17,6 +17,7 @@ use std::time::Duration; use std::time::Instant; +use fastimer_core::MakeDelay; use fastimer_core::make_instant_from_now; use fastimer_timewheel::time_wheel_driver;