diff --git a/CHANGELOG.md b/CHANGELOG.md index 9869ac1..a8ab2fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Dynamic segment splitting (PRD-v2 P0.17, task 17): when a parallel segment finishes before its peers, the engine now re-evaluates the still-running segments, picks the slowest one whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB) and shrinks it in place — a fresh worker takes the upper half so the tail of the download accelerates instead of stalling on a single slow connection. Backend ships a domain-pure `Segment::split(at_byte, new_id)` validation method (state must be `Downloading`, split point strictly inside the unfetched range, caller-provided id must differ from the original — IDs are allocated by the engine's monotonic `next_segment_id` counter, never invented inside the domain), a new `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as the `segment-split` Tauri event and logged in the per-download log store, two new `AppConfig` / `ConfigPatch` / `SettingsDto` fields `dynamic_split_enabled` (default `true`) and `dynamic_split_min_remaining_mb` (default `4`) wired through the toml config store, the Tauri IPC `SettingsDto`/`ConfigPatchDto` (so the frontend can both read and write them) and the new `application::services::engine_config_bridge` subscriber so live `settings_update` calls reconfigure already-running engines without a restart. `SegmentedDownloadEngine` stores `dynamic_split_enabled` / `dynamic_split_min_remaining_bytes` in `Arc` / `Arc` and exposes a `set_dynamic_split(enabled, min_remaining_mb)` setter consumed by the bridge. After a split, the engine updates the original slot's `initial_end` to `split_at` immediately on successful `end_tx.send`, so a subsequent `pick_split_target` evaluation cannot expand the worker's range past the shrunk boundary and `persist_split_meta` records the post-split topology rather than the stale one (closes coderabbit P1 + greptile P1 race). Each segment task now returns `(slot_idx, Result)`; on success the engine flips a `completed: bool` flag on the slot — `pick_split_target` skips completed slots so they cannot be re-picked, and `persist_split_meta` keeps the entry with `completed: true` and a full-range `downloaded_bytes` so a crash right after a split never loses the record of byte ranges already on disk. `pick_split_target` also gates on a 500 ms / non-zero-progress sample window: a fresh split child cannot be picked again until it has actually produced a throughput sample, preventing cascading fragmentation of the newest range. The segment worker accepts the upper bound through a `tokio::sync::watch::Receiver` instead of a frozen `u64`, re-reads it before each chunk fetch and again after every successful network read so a mid-flight shrink clamps the next write to the new boundary; per-segment progress is exposed via an `Arc` so the engine can pick the slowest candidate by throughput (`downloaded / elapsed`). After every split, the engine atomically rewrites `.vortex-meta` with the updated segment topology so resume after a crash mid-split sees a consistent state. (task 17, PR #111 review) - "Report broken plugin" action (PRD-v2 P0.16, task 16): plugins listed in *Plugins → Plugin Store* now expose a *Report broken plugin* item in their kebab menu. Clicking it opens the user's default browser at a pre-filled GitHub issue on the plugin's repository, with diagnostic metadata (plugin name + version, Vortex version, OS, optional URL under test, last 50 log lines) inlined into the issue body. Backend adds a `repository_url` field to `domain::model::plugin::PluginInfo` (parsed from the new `[plugin].repository` key in `plugin.toml`), a `domain::ports::driven::UrlOpener` port plus its platform-native `SystemUrlOpener` adapter (`xdg-open` / `open` / `cmd start`, `http(s)://` only by validation), the std-only `domain::model::plugin::build_report_broken_url` URL builder (RFC 3986 unreserved-set percent encoder, last 50 log lines, GitHub-only repository hosts, accepts `.git` suffix, rejects malformed URLs with `DomainError::ValidationError`), and a `ReportBrokenPluginCommand` handler that returns `AppError::Validation` when a manifest carries no `repository_url`. New Tauri IPC `plugin_report_broken(pluginName, logLines?, testedUrl?) → string` returns the issue URL so the UI can fall back to clipboard copy if the launcher fails. i18n (en/fr): `plugins.action.reportBroken`, `plugins.toast.reportBrokenSuccess`, `plugins.toast.reportBrokenError`. (task 16) - Dynamic plugin configuration UI (PRD-v2 P0.15, task 15): plugins declaring a `[config]` block in their `plugin.toml` now expose their schema at runtime. Backend adds `ConfigField` / `ConfigFieldType` / `PluginConfigSchema` to `domain/model/plugin.rs` (typed validation, enum options, `min`/`max` bounds, regex via a std-only matcher — no external import in the domain), a `PluginConfigStore` port (`get_values` / `set_value` / `list_all` / `delete_all`) implemented by `SqlitePluginConfigRepo` backed by the new `plugin_configs (plugin_name, key, value)` table (migration `m20260425_000005_create_plugin_configs`, composite primary key). The manifest parser (`adapters/driven/plugin/manifest.rs`) now extracts `type`, `default`, `options`, `description`, `min`, `max`, `regex` on top of the existing defaults, and rejects defaults that fail their own field validation. CQRS gains `UpdatePluginConfigCommand` (validates against the schema, applies the runtime first then persists, rolls back on failure) and `GetPluginConfigQuery` (returns the schema plus persisted values, dropping any persisted entry that no longer matches the current schema and falling back to manifest defaults). `PluginLoader` is extended with `get_manifest()` and `set_runtime_config()`; `ExtismPluginLoader` implements both by reading from `PluginRegistry` and writing to `SharedHostResources::plugin_configs`, so `get_config(key)` calls from the WASM plugin observe the new value without a reload. At startup, `lib.rs` replays persisted configs onto the in-memory map before plugins are loaded. Frontend adds two components: `PluginConfigField.tsx` (dispatcher renderer: `string` → text input, `boolean` → shadcn switch, `integer`/`float` → numeric input with bounds, `url` → url input, `enum` (and `string` with options) → shadcn select; `aria-describedby` on the control points to the error message) and `PluginConfigDialog.tsx` (loads the schema via `useQuery`, validates each field on the UI side (rejects empty floats, validates JSON arrays) before sending, persists changed values sequentially, guards the schema-reset effect while a save is in flight to avoid clobbering the draft, invalidates the query on success). `PluginsView` queries `plugin_config_get` for each installed plugin (keyed off the unfiltered installed list to avoid churn while typing in search) to decide whether the *Configure* button (Settings icon, next to the *More* menu) should render: a plugin without `[config]` exposes no button. New IPC commands `plugin_config_get(name) → PluginConfigView` and `plugin_config_update(name, key, value)`. i18n (en/fr): `plugins.action.configure`, `plugins.config.{title,description,loading,error,noFields,toast.{saveSuccess,validationFailed}}`. (task 15) - History retention with automatic daily purge (PRD-v2 P0.14, task 14): new `history_retention_days` setting (default 30, presets 7 / 30 / 90 / 365 / `0 = unlimited`) exposed in the *General* Settings tab as a `Select` dropdown wired to `settings_update`. Backend ships a `Clock` domain port (`SystemClock` adapter under `adapters/driven/scheduler/`) and a `HistoryPurgeWorker` daemon spawned during Tauri setup that hard-deletes `history` rows where `completed_at < now - retention_days * 86_400`. The worker persists its last run as a Unix-epoch timestamp inside `/.history_purge_state` (sentinel filename `HISTORY_PURGE_STATE_FILE`). On startup, the daemon reads the sentinel and either runs immediately (missing/stale) or sleeps for `SECS_PER_DAY - elapsed` so the first post-launch purge stays anchored to the previous successful run instead of drifting up to ~47h after a restart; the recurring loop then ticks every 24h via `tokio::time::interval` with `MissedTickBehavior::Skip`. `retention_days <= 0` is a no-op that does not write the sentinel, so the next run re-fires the moment the user re-enables retention; corrupt sentinels are treated as "never ran" so a stuck file never blocks the scheduler. The worker shares the same `Arc` and `Arc` the IPC layer already mutates, so a settings change is observed without restart. Domain helper `normalize_history_retention_days` clamps negatives back to `0` and is now applied at every write boundary — `apply_patch` (so a crafted `settings_update` payload cannot persist a negative) and `From for AppConfig` (so a hand-edited `config.toml` is normalized at load) — plus the worker itself for defense-in-depth. (task 14) diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 2729d08..9ae17c2 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -4,6 +4,7 @@ version = "0.2.0" description = "A desktop download manager" authors = ["mpiton"] edition = "2024" +rust-version = "1.95" license = "GPL-3.0-only" [lib] diff --git a/src-tauri/src/adapters/driven/config/toml_config_store.rs b/src-tauri/src/adapters/driven/config/toml_config_store.rs index fcea4bf..4489f05 100644 --- a/src-tauri/src/adapters/driven/config/toml_config_store.rs +++ b/src-tauri/src/adapters/driven/config/toml_config_store.rs @@ -156,6 +156,8 @@ struct ConfigDto { retry_delay_seconds: u32, verify_checksums: bool, pre_allocate_space: bool, + dynamic_split_enabled: bool, + dynamic_split_min_remaining_mb: u64, // History history_retention_days: i64, @@ -211,6 +213,8 @@ impl From for ConfigDto { retry_delay_seconds: c.retry_delay_seconds, verify_checksums: c.verify_checksums, pre_allocate_space: c.pre_allocate_space, + dynamic_split_enabled: c.dynamic_split_enabled, + dynamic_split_min_remaining_mb: c.dynamic_split_min_remaining_mb, history_retention_days: c.history_retention_days, proxy_type: c.proxy_type, proxy_url: c.proxy_url, @@ -251,6 +255,8 @@ impl From for AppConfig { retry_delay_seconds: d.retry_delay_seconds, verify_checksums: d.verify_checksums, pre_allocate_space: d.pre_allocate_space, + dynamic_split_enabled: d.dynamic_split_enabled, + dynamic_split_min_remaining_mb: d.dynamic_split_min_remaining_mb, history_retention_days: normalize_history_retention_days(d.history_retention_days), proxy_type: d.proxy_type, proxy_url: d.proxy_url, diff --git a/src-tauri/src/adapters/driven/event/tauri_bridge.rs b/src-tauri/src/adapters/driven/event/tauri_bridge.rs index 41dfdf4..9ade8ec 100644 --- a/src-tauri/src/adapters/driven/event/tauri_bridge.rs +++ b/src-tauri/src/adapters/driven/event/tauri_bridge.rs @@ -50,6 +50,7 @@ fn event_name(event: &DomainEvent) -> &'static str { DomainEvent::SegmentStarted { .. } => "segment-started", DomainEvent::SegmentCompleted { .. } => "segment-completed", DomainEvent::SegmentFailed { .. } => "segment-failed", + DomainEvent::SegmentSplit { .. } => "segment-split", DomainEvent::PluginLoaded { .. } => "plugin-loaded", DomainEvent::PluginUnloaded { .. } => "plugin-unloaded", DomainEvent::PackageCreated { .. } => "package-created", @@ -116,6 +117,19 @@ fn event_payload(event: &DomainEvent) -> serde_json::Value { } => { json!({ "downloadId": download_id.0, "segmentId": segment_id, "error": error }) } + DomainEvent::SegmentSplit { + download_id, + original_segment_id, + new_segment_id, + split_at, + } => { + json!({ + "downloadId": download_id.0, + "originalSegmentId": original_segment_id, + "newSegmentId": new_segment_id, + "splitAt": split_at, + }) + } DomainEvent::PluginLoaded { name, version } => { json!({ "name": name, "version": version }) diff --git a/src-tauri/src/adapters/driven/logging/download_log_bridge.rs b/src-tauri/src/adapters/driven/logging/download_log_bridge.rs index b0115de..7393983 100644 --- a/src-tauri/src/adapters/driven/logging/download_log_bridge.rs +++ b/src-tauri/src/adapters/driven/logging/download_log_bridge.rs @@ -84,6 +84,19 @@ fn record_download_event(store: &DownloadLogStore, event: &DomainEvent) { format!("[ERROR] Segment {segment_id} failed: {error}"), ); } + DomainEvent::SegmentSplit { + download_id, + original_segment_id, + new_segment_id, + split_at, + } => { + store.push( + download_id.0, + format!( + "[INFO] Segment {original_segment_id} split at byte {split_at}; new segment {new_segment_id} took the upper half" + ), + ); + } DomainEvent::ChecksumVerified { id, algorithm, .. } => { store.push(id.0, format!("[INFO] {algorithm} checksum verified")); } diff --git a/src-tauri/src/adapters/driven/network/download_engine.rs b/src-tauri/src/adapters/driven/network/download_engine.rs index 43e5999..f91fc12 100644 --- a/src-tauri/src/adapters/driven/network/download_engine.rs +++ b/src-tauri/src/adapters/driven/network/download_engine.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::atomic::AtomicU64; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::watch; @@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken; use crate::domain::error::DomainError; use crate::domain::event::DomainEvent; use crate::domain::model::download::{Download, DownloadId}; +use crate::domain::model::meta::{DownloadMeta, SegmentMeta}; use crate::domain::ports::driven::{DownloadEngine, EventBus, FileStorage}; use super::format_error_chain; @@ -20,12 +21,155 @@ struct ActiveDownload { pause_sender: watch::Sender, } +/// Minimum age and downloaded bytes a segment must have before it is +/// eligible for split. Without this gate a fresh split child (downloaded == 0, +/// elapsed ≈ 0) would compute as 0 B/s, become the guaranteed "slowest" +/// candidate, and be re-split immediately on the next completion event — +/// cascading fragmentation of the newest range without any real slow-tail +/// signal. +const MIN_SPLIT_SAMPLE_DURATION: std::time::Duration = std::time::Duration::from_millis(500); + +/// Runtime state of one in-flight segment, tracked by the engine so it can +/// shrink the segment's range and observe its throughput for dynamic split. +struct SegmentRuntimeState { + end_tx: watch::Sender, + progress: Arc, + started_at: std::time::Instant, + start_byte: u64, + initial_end: u64, + /// Set by the coordinator when the worker for this slot returns `Ok(_)`. + /// Completed slots stay in `active_segments` (instead of being cleared) + /// so `persist_split_meta` records their byte range with `completed: true` + /// — otherwise a crash right after a split would leave the resume meta + /// without any record that those bytes are already on disk. + completed: bool, +} + +/// Pick the slowest active segment whose remaining range is large enough +/// to benefit from a split. Returns the slot index and the byte at which +/// to split (midpoint of the remaining range). +fn pick_split_target( + segments: &[SegmentRuntimeState], + min_remaining_bytes: u64, +) -> Option<(usize, u64)> { + let mut slowest: Option<(usize, f64, u64)> = None; + for (idx, state) in segments.iter().enumerate() { + if state.completed { + continue; + } + if state.initial_end == u64::MAX { + continue; // unbounded segments cannot be split + } + let downloaded = state.progress.load(Ordering::Relaxed); + if downloaded == 0 { + continue; // no throughput sample yet + } + let elapsed = state.started_at.elapsed(); + if elapsed < MIN_SPLIT_SAMPLE_DURATION { + continue; // worker hasn't run long enough to produce a meaningful bps + } + let current_offset = state.start_byte.saturating_add(downloaded); + if current_offset >= state.initial_end { + continue; // already at end — completion event will fire shortly + } + let remaining = state.initial_end - current_offset; + if remaining < min_remaining_bytes.max(1) { + continue; + } + let split_at = current_offset.saturating_add(remaining / 2); + if split_at <= current_offset || split_at >= state.initial_end { + continue; + } + let bps = downloaded as f64 / elapsed.as_secs_f64().max(1e-3); + match slowest { + None => slowest = Some((idx, bps, split_at)), + Some((_, prev_bps, _)) if bps < prev_bps => { + slowest = Some((idx, bps, split_at)); + } + _ => {} + } + } + slowest.map(|(idx, _, split_at)| (idx, split_at)) +} + +/// Atomically rewrite `.vortex-meta` after a dynamic split so resume after a +/// crash sees the updated segment topology. A failure here only logs — the +/// in-memory split is still valid for the live download. +async fn persist_split_meta( + file_storage: &Arc, + dest_path: &Path, + download_id: DownloadId, + url: &str, + total_size: u64, + active_segments: &[SegmentRuntimeState], +) { + // Snapshot every slot — including completed ones — so a crash right + // after a split does not lose the record of byte ranges already on + // disk. Completed segments report their full range as downloaded so + // resume does not re-fetch them. + let segments_meta: Vec = active_segments + .iter() + .enumerate() + .map(|(i, st)| { + let downloaded = if st.completed { + st.initial_end.saturating_sub(st.start_byte) + } else { + st.progress.load(Ordering::Relaxed) + }; + SegmentMeta { + id: i as u32, + start_byte: st.start_byte, + end_byte: st.initial_end, + downloaded_bytes: downloaded, + completed: st.completed, + } + }) + .collect(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let file_name = dest_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("") + .to_string(); + let snapshot = DownloadMeta { + download_id, + url: url.to_string(), + file_name, + total_bytes: Some(total_size), + segments: segments_meta, + checksum_expected: None, + created_at: now, + updated_at: now, + }; + let storage = file_storage.clone(); + let path = dest_path.to_path_buf(); + let join = tokio::task::spawn_blocking(move || storage.write_meta(&path, &snapshot)).await; + match join { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!( + download_id = download_id.0, + error = %e, + "persist meta after split failed (download still proceeds)" + ), + Err(e) => tracing::warn!( + download_id = download_id.0, + error = %e, + "persist meta after split task panicked" + ), + } +} + pub struct SegmentedDownloadEngine { client: reqwest::Client, file_storage: Arc, event_bus: Arc, default_segments: u32, min_segment_bytes: u64, + dynamic_split_enabled: Arc, + dynamic_split_min_remaining_bytes: Arc, active_downloads: Arc>>, } @@ -42,6 +186,8 @@ impl SegmentedDownloadEngine { event_bus, default_segments: default_segments.max(1), min_segment_bytes: 64 * 1024, + dynamic_split_enabled: Arc::new(AtomicBool::new(true)), + dynamic_split_min_remaining_bytes: Arc::new(AtomicU64::new(4 * 1024 * 1024)), active_downloads: Arc::new(Mutex::new(HashMap::new())), } } @@ -51,6 +197,36 @@ impl SegmentedDownloadEngine { self } + /// Configure runtime re-splitting of slow segments. PRD §7.1. + /// `min_remaining_mb == 0` disables the size gate entirely; the engine + /// then only refuses to split if the candidate has 0 bytes left. + pub fn with_dynamic_split(self, enabled: bool, min_remaining_mb: u64) -> Self { + self.set_dynamic_split(enabled, min_remaining_mb); + self + } + + /// Update dynamic-split runtime parameters live. Used by the engine + /// config bridge so settings changes from the UI take effect on + /// already-running and newly-started downloads without restart. + pub fn set_dynamic_split(&self, enabled: bool, min_remaining_mb: u64) { + self.dynamic_split_enabled.store(enabled, Ordering::Relaxed); + self.dynamic_split_min_remaining_bytes.store( + min_remaining_mb.saturating_mul(1024 * 1024), + Ordering::Relaxed, + ); + } + + /// Read back the current dynamic-split parameters as `(enabled, min_remaining_bytes)`. + /// Lets the bridge tests prove that a `SettingsUpdated` event actually + /// reaches the engine; also useful for diagnostics on a running download. + pub fn dynamic_split_state(&self) -> (bool, u64) { + ( + self.dynamic_split_enabled.load(Ordering::Relaxed), + self.dynamic_split_min_remaining_bytes + .load(Ordering::Relaxed), + ) + } + async fn probe_remote_metadata( client: &reqwest::Client, url: &str, @@ -133,6 +309,8 @@ impl DownloadEngine for SegmentedDownloadEngine { let event_bus = self.event_bus.clone(); let active_downloads = self.active_downloads.clone(); let min_segment_bytes = self.min_segment_bytes; + let dynamic_split_enabled = self.dynamic_split_enabled.clone(); + let dynamic_split_min_remaining_bytes = self.dynamic_split_min_remaining_bytes.clone(); tokio::spawn(async move { let (total_size, supports_range) = @@ -273,9 +451,20 @@ impl DownloadEngine for SegmentedDownloadEngine { event_bus.publish(DomainEvent::DownloadStarted { id: download_id }); let shared_downloaded = Arc::new(AtomicU64::new(0)); - let mut join_set = JoinSet::new(); + let mut join_set: JoinSet<(usize, Result)> = JoinSet::new(); + let mut active_segments: Vec = Vec::with_capacity(segments.len()); for (index, (start, end)) in segments.iter().enumerate() { - join_set.spawn(download_segment(SegmentParams { + let (end_tx, end_rx) = watch::channel(*end); + let progress = Arc::new(AtomicU64::new(0)); + active_segments.push(SegmentRuntimeState { + end_tx, + progress: progress.clone(), + started_at: std::time::Instant::now(), + start_byte: *start, + initial_end: *end, + completed: false, + }); + let params = SegmentParams { client: client.clone(), file_storage: file_storage.clone(), event_bus: event_bus.clone(), @@ -283,39 +472,132 @@ impl DownloadEngine for SegmentedDownloadEngine { segment_index: index as u32, url: url.clone(), start_byte: *start, - end_byte: *end, + end_byte_rx: end_rx, already_downloaded: 0, total_file_size: total_size, dest_path: dest_path.clone(), pause_rx: pause_rx.clone(), cancel_token: cancel_token.clone(), shared_downloaded: shared_downloaded.clone(), - })); + segment_progress: progress, + }; + let slot_idx = index; + join_set.spawn(async move { (slot_idx, download_segment(params).await) }); } let mut failed = false; let mut error_msg = String::new(); + let mut next_segment_id: u32 = segments.len() as u32; while let Some(result) = join_set.join_next().await { match result { - Ok(Ok(_bytes)) => {} - Ok(Err(e)) => match e { - SegmentError::Cancelled => { - cancel_token.cancel(); + Ok((slot_idx, Ok(_bytes))) => { + // Mark the slot completed instead of removing it — the + // persist_split_meta call below must record completed + // ranges so a crash mid-split does not lose the fact + // that those bytes are already on disk. + if slot_idx < active_segments.len() { + active_segments[slot_idx].completed = true; } - _ => { - if failed { + + if dynamic_split_enabled.load(Ordering::Relaxed) + && !cancel_token.is_cancelled() + && let Some((idx, split_at)) = pick_split_target( + &active_segments, + dynamic_split_min_remaining_bytes.load(Ordering::Relaxed), + ) + { + let new_id = next_segment_id; + next_segment_id += 1; + // Capture state and update initial_end on success so a + // subsequent pick_split_target on the same slot — or a + // crash recovery via persist_split_meta — observes the + // shrunk range, not the pre-split end. + let initial_end = active_segments[idx].initial_end; + let signal_sent = active_segments[idx].end_tx.send(split_at).is_ok(); + if signal_sent { + active_segments[idx].initial_end = split_at; + } else { tracing::warn!( download_id = download_id.0, - previous_error = %error_msg, - "additional segment failure (overwriting previous error)" + original_segment_id = idx as u32, + "split skipped: target worker no longer listening" ); + continue; + } + event_bus.publish(DomainEvent::SegmentSplit { + download_id, + original_segment_id: idx as u32, + new_segment_id: new_id, + split_at, + }); + + let new_progress = Arc::new(AtomicU64::new(0)); + let (new_end_tx, new_end_rx) = watch::channel(initial_end); + let new_slot_idx = active_segments.len(); + let params = SegmentParams { + client: client.clone(), + file_storage: file_storage.clone(), + event_bus: event_bus.clone(), + download_id, + segment_index: new_id, + url: url.clone(), + start_byte: split_at, + end_byte_rx: new_end_rx, + already_downloaded: 0, + total_file_size: total_size, + dest_path: dest_path.clone(), + pause_rx: pause_rx.clone(), + cancel_token: cancel_token.clone(), + shared_downloaded: shared_downloaded.clone(), + segment_progress: new_progress.clone(), + }; + join_set.spawn(async move { + (new_slot_idx, download_segment(params).await) + }); + active_segments.push(SegmentRuntimeState { + end_tx: new_end_tx, + progress: new_progress, + started_at: std::time::Instant::now(), + start_byte: split_at, + initial_end, + completed: false, + }); + + persist_split_meta( + &file_storage, + &dest_path, + download_id, + &url, + total_size, + &active_segments, + ) + .await; + } + } + Ok((_slot_idx, Err(e))) => { + // Errored slots stay in active_segments without + // `completed = true`; pick_split_target ignores them + // because the worker is gone (end_tx send fails) and + // the cancel below tears the whole download down. + match e { + SegmentError::Cancelled => { + cancel_token.cancel(); + } + _ => { + if failed { + tracing::warn!( + download_id = download_id.0, + previous_error = %error_msg, + "additional segment failure (overwriting previous error)" + ); + } + error_msg = format!("{e:?}"); + failed = true; + cancel_token.cancel(); } - error_msg = format!("{e:?}"); - failed = true; - cancel_token.cancel(); } - }, + } Err(e) => { error_msg = format!("segment task panicked: {e}"); failed = true; @@ -761,6 +1043,198 @@ mod tests { ); } + #[tokio::test] + async fn test_dynamic_split_skipped_when_remaining_too_small() { + // 2 KiB total, 4 segments, min_remaining 4 MiB → split must NOT trigger. + let server = MockServer::start().await; + let body = vec![b'a'; 2048]; + + Mock::given(method("HEAD")) + .and(path("/small")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-length", "2048") + .insert_header("accept-ranges", "bytes"), + ) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("/small")) + .respond_with(ResponseTemplate::new(206).set_body_bytes(body)) + .mount(&server) + .await; + + let storage = Arc::new(MockFileStorage::new()); + let bus = Arc::new(CollectingEventBus::new()); + let engine = SegmentedDownloadEngine::new(reqwest::Client::new(), storage, bus.clone(), 4) + .with_min_segment_bytes(256) + .with_dynamic_split(true, 4); // 4 MiB threshold blocks 2 KiB file + + let url = format!("{}/small", server.uri()); + let download = make_download(70, &url); + engine.start(&download).unwrap(); + + let found = bus + .wait_for_event_async( + |e| matches!(e, DomainEvent::DownloadCompleted { id } if id.0 == 70), + Duration::from_secs(5), + ) + .await; + assert!(found, "download did not complete"); + + let events = bus.collected(); + assert!( + !events + .iter() + .any(|e| matches!(e, DomainEvent::SegmentSplit { .. })), + "no split should fire when remaining < threshold; got {events:?}" + ); + } + + #[tokio::test] + async fn test_dynamic_split_disabled_via_config_does_not_split() { + let server = MockServer::start().await; + let body = vec![b'x'; 64 * 1024]; + + Mock::given(method("HEAD")) + .and(path("/disabled")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-length", "65536") + .insert_header("accept-ranges", "bytes"), + ) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/disabled")) + .respond_with(ResponseTemplate::new(206).set_body_bytes(body)) + .mount(&server) + .await; + + let storage = Arc::new(MockFileStorage::new()); + let bus = Arc::new(CollectingEventBus::new()); + let engine = SegmentedDownloadEngine::new(reqwest::Client::new(), storage, bus.clone(), 4) + .with_min_segment_bytes(1024) + .with_dynamic_split(false, 0); + + let url = format!("{}/disabled", server.uri()); + let download = make_download(71, &url); + engine.start(&download).unwrap(); + + let found = bus + .wait_for_event_async( + |e| matches!(e, DomainEvent::DownloadCompleted { id } if id.0 == 71), + Duration::from_secs(5), + ) + .await; + assert!(found); + let events = bus.collected(); + assert!( + !events + .iter() + .any(|e| matches!(e, DomainEvent::SegmentSplit { .. })), + "split must not fire when disabled" + ); + } + + #[test] + fn test_pick_split_target_prefers_slowest_above_threshold() { + let make = |start: u64, end: u64, downloaded: u64, age_ms: u64| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(age_ms), + start_byte: start, + initial_end: end, + completed: false, + }; + let segs = [ + // fast: 1 MiB downloaded in 1500 ms → ~700 KiB/s + make(0, 16 * 1024 * 1024, 1024 * 1024, 1500), + // slow: 100 KiB in 1000 ms → ~100 KiB/s, plenty of remaining + make(16 * 1024 * 1024, 32 * 1024 * 1024, 100 * 1024, 1000), + // tiny remaining → must be filtered + make(32 * 1024 * 1024, 32 * 1024 * 1024 + 1024, 512, 600), + ]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert_eq!( + pick.map(|(i, _)| i), + Some(1), + "expected slot 1 (slowest with enough remaining), got {pick:?}" + ); + let (_, split_at) = pick.unwrap(); + assert!( + split_at > 16 * 1024 * 1024 + 100 * 1024, + "split must be above current offset" + ); + assert!( + split_at < 32 * 1024 * 1024, + "split must be below initial_end" + ); + } + + #[test] + fn test_pick_split_target_returns_none_when_all_below_threshold() { + let make = |start: u64, end: u64, downloaded: u64| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(800), + start_byte: start, + initial_end: end, + completed: false, + }; + let segs = [make(0, 1024, 100), make(1024, 2048, 1), make(2048, 3072, 1)]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert!(pick.is_none(), "got {pick:?}"); + } + + #[test] + fn test_pick_split_target_skips_fresh_segments() { + // Brand-new split children should not be candidates: no throughput + // sample yet (downloaded == 0) and elapsed below MIN_SPLIT_SAMPLE_DURATION. + // A genuinely slow neighbor (1000 ms / 100 KiB) sits next to them. + let mk = |start: u64, end: u64, downloaded: u64, age_ms: u64| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(age_ms), + start_byte: start, + initial_end: end, + completed: false, + }; + let segs = [ + // fresh child: 0 bytes, 50 ms — must be skipped despite being "slowest" + mk(0, 16 * 1024 * 1024, 0, 50), + // slightly older but still no sample — must be skipped + mk(16 * 1024 * 1024, 32 * 1024 * 1024, 0, 200), + // genuinely slow but mature + mk(32 * 1024 * 1024, 48 * 1024 * 1024, 100 * 1024, 1000), + ]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert_eq!(pick.map(|(i, _)| i), Some(2), "got {pick:?}"); + } + + #[test] + fn test_pick_split_target_skips_completed_segments() { + // A completed slot must never be picked even if its throughput was the + // slowest before completion. + let mk = |start: u64, end: u64, downloaded: u64, completed: bool| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(1000), + start_byte: start, + initial_end: end, + completed, + }; + let segs = [ + // completed slow segment — must be ignored + mk(0, 16 * 1024 * 1024, 16 * 1024 * 1024, true), + // live, slower in absolute terms but only it is eligible + mk(16 * 1024 * 1024, 32 * 1024 * 1024, 100 * 1024, false), + ]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert_eq!(pick.map(|(i, _)| i), Some(1)); + } + #[tokio::test] async fn test_pause_unknown_id_returns_not_found() { let storage = Arc::new(MockFileStorage::new()); diff --git a/src-tauri/src/adapters/driven/network/segment_worker.rs b/src-tauri/src/adapters/driven/network/segment_worker.rs index 621a663..77a66c2 100644 --- a/src-tauri/src/adapters/driven/network/segment_worker.rs +++ b/src-tauri/src/adapters/driven/network/segment_worker.rs @@ -21,6 +21,14 @@ pub(crate) enum SegmentError { PauseChannelClosed, } +/// Read the watched end_byte. Returns `None` for the unbounded sentinel +/// (`u64::MAX`, used when the server didn't advertise a length and we +/// cannot send a Range header). +fn bounded(end_rx: &watch::Receiver) -> Option { + let v = *end_rx.borrow(); + if v == u64::MAX { None } else { Some(v) } +} + /// Parameters for a single segment download. pub(crate) struct SegmentParams { pub client: reqwest::Client, @@ -30,8 +38,10 @@ pub(crate) struct SegmentParams { pub segment_index: u32, pub url: String, pub start_byte: u64, - /// Exclusive upper bound of this segment's byte range. - pub end_byte: u64, + /// Watchable exclusive upper bound. May be reduced mid-flight by the + /// engine to support PRD §7.1 dynamic splitting. `u64::MAX` means + /// "unbounded — no Range header" and must not be reduced after start. + pub end_byte_rx: watch::Receiver, pub already_downloaded: u64, /// Total size of the entire file (used in progress events). pub total_file_size: u64, @@ -40,6 +50,9 @@ pub(crate) struct SegmentParams { pub cancel_token: CancellationToken, /// Shared atomic counter for aggregate progress across all segments. pub shared_downloaded: Arc, + /// Per-segment downloaded counter, observable by the engine to estimate + /// throughput when picking a split target. + pub segment_progress: Arc, } /// Downloads a single byte range and writes it to disk. @@ -55,24 +68,26 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result= end_byte { + if effective_start >= initial_end { event_bus.publish(DomainEvent::SegmentCompleted { download_id, segment_id: segment_index, @@ -82,8 +97,8 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result Result= current_end + { + break; + } + // Check pause state — if paused, wait with cancellation support if *pause_rx.borrow() { loop { @@ -191,14 +214,21 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result end_byte { - let allowed = end_byte.saturating_sub(offset) as usize; - data.truncate(allowed); - chunk_len = allowed as u64; - if chunk_len == 0 { + // Re-read end_byte AFTER chunk fetch so an engine-driven mid-flight + // shrink that landed during the network read is honored before we + // write past the new boundary. + if let Some(live_end) = bounded(&end_byte_rx) { + if offset >= live_end { break; } + if offset + chunk_len > live_end { + let allowed = live_end.saturating_sub(offset) as usize; + data.truncate(allowed); + chunk_len = allowed as u64; + if chunk_len == 0 { + break; + } + } } tokio::task::spawn_blocking(move || storage.write_segment(&path, offset, &data)) @@ -216,6 +246,7 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result Result for SettingsDto { retry_delay_seconds: c.retry_delay_seconds, verify_checksums: c.verify_checksums, pre_allocate_space: c.pre_allocate_space, + dynamic_split_enabled: c.dynamic_split_enabled, + dynamic_split_min_remaining_mb: c.dynamic_split_min_remaining_mb, history_retention_days: c.history_retention_days, proxy_type: c.proxy_type, proxy_url: c.proxy_url, @@ -907,6 +911,8 @@ pub struct ConfigPatchDto { pub retry_delay_seconds: Option, pub verify_checksums: Option, pub pre_allocate_space: Option, + pub dynamic_split_enabled: Option, + pub dynamic_split_min_remaining_mb: Option, // History pub history_retention_days: Option, @@ -955,6 +961,8 @@ impl From for ConfigPatch { retry_delay_seconds: d.retry_delay_seconds, verify_checksums: d.verify_checksums, pre_allocate_space: d.pre_allocate_space, + dynamic_split_enabled: d.dynamic_split_enabled, + dynamic_split_min_remaining_mb: d.dynamic_split_min_remaining_mb, history_retention_days: d.history_retention_days, proxy_type: d.proxy_type, proxy_url: d.proxy_url, diff --git a/src-tauri/src/application/services/engine_config_bridge.rs b/src-tauri/src/application/services/engine_config_bridge.rs new file mode 100644 index 0000000..5472a4f --- /dev/null +++ b/src-tauri/src/application/services/engine_config_bridge.rs @@ -0,0 +1,194 @@ +//! Bridges `SettingsUpdated` events to live engine knobs. +//! +//! The download engine caches `dynamic_split_*` parameters in atomic +//! fields so settings changes from the UI take effect on already-running +//! and newly-started downloads without restart. Mirrors the pattern used +//! by [`super::queue_config_bridge`] for `max_concurrent_downloads`. + +use std::sync::Arc; + +use crate::adapters::driven::network::SegmentedDownloadEngine; +use crate::domain::event::DomainEvent; +use crate::domain::ports::driven::{ConfigStore, EventBus}; + +/// Subscribe the engine to configuration updates. +/// +/// On every [`DomainEvent::SettingsUpdated`], reads the current +/// `dynamic_split_*` values and forwards them to +/// [`SegmentedDownloadEngine::set_dynamic_split`]. Read errors are +/// logged and swallowed so one bad read does not poison the +/// subscription. +pub fn subscribe_engine_to_config( + event_bus: &dyn EventBus, + config_store: Arc, + engine: Arc, +) { + event_bus.subscribe(Box::new(move |event| { + if !matches!(event, DomainEvent::SettingsUpdated) { + return; + } + match config_store.get_config() { + Ok(config) => { + engine.set_dynamic_split( + config.dynamic_split_enabled, + config.dynamic_split_min_remaining_mb, + ); + } + Err(err) => { + tracing::error!(%err, "engine_config_bridge: failed to read config"); + } + } + })); +} + +#[cfg(test)] +mod tests { + use std::path::Path; + use std::sync::Mutex; + + use super::*; + use crate::domain::error::DomainError; + use crate::domain::model::config::{AppConfig, ConfigPatch, apply_patch}; + use crate::domain::model::download::DownloadId; + use crate::domain::model::meta::DownloadMeta; + use crate::domain::ports::driven::FileStorage; + + struct StubConfigStore { + config: Mutex, + } + + impl ConfigStore for StubConfigStore { + fn get_config(&self) -> Result { + Ok(self.config.lock().unwrap().clone()) + } + + fn update_config(&self, patch: ConfigPatch) -> Result { + let mut cfg = self.config.lock().unwrap(); + apply_patch(&mut cfg, &patch); + Ok(cfg.clone()) + } + } + + type Handler = Box; + + struct SyncEventBus { + handlers: Mutex>, + } + + impl SyncEventBus { + fn new() -> Self { + Self { + handlers: Mutex::new(Vec::new()), + } + } + } + + impl EventBus for SyncEventBus { + fn publish(&self, event: DomainEvent) { + let handlers = self.handlers.lock().unwrap(); + for handler in handlers.iter() { + handler(&event); + } + } + + fn subscribe(&self, handler: Handler) { + self.handlers.lock().unwrap().push(handler); + } + } + + struct NoopStorage; + impl FileStorage for NoopStorage { + fn create_file(&self, _path: &Path, _size: u64) -> Result<(), DomainError> { + Ok(()) + } + fn write_segment( + &self, + _path: &Path, + _offset: u64, + _data: &[u8], + ) -> Result<(), DomainError> { + Ok(()) + } + fn read_meta(&self, _path: &Path) -> Result, DomainError> { + Ok(None) + } + fn write_meta(&self, _path: &Path, _meta: &DownloadMeta) -> Result<(), DomainError> { + Ok(()) + } + fn delete_meta(&self, _path: &Path) -> Result<(), DomainError> { + Ok(()) + } + } + + fn make_engine() -> Arc { + Arc::new(SegmentedDownloadEngine::new( + reqwest::Client::new(), + Arc::new(NoopStorage), + Arc::new(SyncEventBus::new()), + 4, + )) + } + + const MIB: u64 = 1024 * 1024; + + #[tokio::test] + async fn test_settings_updated_propagates_dynamic_split_changes() { + let cfg = AppConfig { + dynamic_split_enabled: false, + dynamic_split_min_remaining_mb: 16, + ..AppConfig::default() + }; + let config_store: Arc = Arc::new(StubConfigStore { + config: Mutex::new(cfg), + }); + let bus = SyncEventBus::new(); + let engine = make_engine(); + + // Seed the engine with values that differ from the persisted config so + // a successful bridge call has something to flip. + engine.set_dynamic_split(true, 4); + assert_eq!(engine.dynamic_split_state(), (true, 4 * MIB)); + + subscribe_engine_to_config(&bus, Arc::clone(&config_store), Arc::clone(&engine)); + + // Publishing must propagate the persisted (false, 16 MiB) into the engine. + bus.publish(DomainEvent::SettingsUpdated); + assert_eq!(engine.dynamic_split_state(), (false, 16 * MIB)); + + // A subsequent patch + publish must flip both knobs again. + config_store + .update_config(ConfigPatch { + dynamic_split_enabled: Some(true), + dynamic_split_min_remaining_mb: Some(8), + ..Default::default() + }) + .unwrap(); + bus.publish(DomainEvent::SettingsUpdated); + assert_eq!(engine.dynamic_split_state(), (true, 8 * MIB)); + } + + #[tokio::test] + async fn test_non_settings_events_are_ignored() { + // Persisted config differs from the engine state so a stray bridge + // call would be observable. + let cfg = AppConfig { + dynamic_split_enabled: false, + dynamic_split_min_remaining_mb: 32, + ..AppConfig::default() + }; + let config_store: Arc = Arc::new(StubConfigStore { + config: Mutex::new(cfg), + }); + let bus = SyncEventBus::new(); + let engine = make_engine(); + engine.set_dynamic_split(true, 4); + let before = engine.dynamic_split_state(); + assert_eq!(before, (true, 4 * MIB)); + + subscribe_engine_to_config(&bus, Arc::clone(&config_store), Arc::clone(&engine)); + + // Non-Settings events must NOT touch the engine. + bus.publish(DomainEvent::DownloadStarted { id: DownloadId(1) }); + assert_eq!(engine.dynamic_split_state(), before); + } +} diff --git a/src-tauri/src/application/services/mod.rs b/src-tauri/src/application/services/mod.rs index 061537b..a3ef2d7 100644 --- a/src-tauri/src/application/services/mod.rs +++ b/src-tauri/src/application/services/mod.rs @@ -1,8 +1,10 @@ pub mod checksum_validator; +pub mod engine_config_bridge; pub mod queue_config_bridge; pub mod queue_manager; pub mod startup_recovery; pub use checksum_validator::{ChecksumOutcome, ChecksumValidatorService}; +pub use engine_config_bridge::subscribe_engine_to_config; pub use queue_config_bridge::subscribe_queue_to_config; pub use queue_manager::QueueManager; diff --git a/src-tauri/src/domain/event.rs b/src-tauri/src/domain/event.rs index aa1f430..08814db 100644 --- a/src-tauri/src/domain/event.rs +++ b/src-tauri/src/domain/event.rs @@ -88,6 +88,16 @@ pub enum DomainEvent { segment_id: u32, error: String, }, + /// A still-running segment was split in two by the dynamic-split + /// scheduler so the remaining range can be parallelised. The original + /// segment now ends at `split_at`; a fresh segment with `new_segment_id` + /// covers `[split_at, original_end)`. + SegmentSplit { + download_id: DownloadId, + original_segment_id: u32, + new_segment_id: u32, + split_at: u64, + }, // Plugins PluginLoaded { diff --git a/src-tauri/src/domain/model/config.rs b/src-tauri/src/domain/model/config.rs index 4fecd6d..053055b 100644 --- a/src-tauri/src/domain/model/config.rs +++ b/src-tauri/src/domain/model/config.rs @@ -30,6 +30,13 @@ pub struct AppConfig { pub retry_delay_seconds: u32, pub verify_checksums: bool, pub pre_allocate_space: bool, + /// Enable runtime re-split of slow segments when a faster segment + /// finishes. PRD §7.1 (répartition dynamique). + pub dynamic_split_enabled: bool, + /// Minimum remaining bytes (in MiB) required before a segment is + /// eligible for re-split. Below this threshold, the parallelism gain + /// is dwarfed by HTTP request and rebalance overhead. + pub dynamic_split_min_remaining_mb: u64, // ── History ────────────────────────────────────────────────────── /// Number of days history entries are retained before automatic @@ -96,6 +103,8 @@ impl Default for AppConfig { retry_delay_seconds: 10, verify_checksums: true, pre_allocate_space: true, + dynamic_split_enabled: true, + dynamic_split_min_remaining_mb: 4, // History history_retention_days: 30, @@ -156,6 +165,8 @@ pub struct ConfigPatch { pub retry_delay_seconds: Option, pub verify_checksums: Option, pub pre_allocate_space: Option, + pub dynamic_split_enabled: Option, + pub dynamic_split_min_remaining_mb: Option, // History pub history_retention_days: Option, @@ -267,6 +278,12 @@ pub fn apply_patch(config: &mut AppConfig, patch: &ConfigPatch) { if let Some(v) = patch.pre_allocate_space { config.pre_allocate_space = v; } + if let Some(v) = patch.dynamic_split_enabled { + config.dynamic_split_enabled = v; + } + if let Some(v) = patch.dynamic_split_min_remaining_mb { + config.dynamic_split_min_remaining_mb = v; + } // History if let Some(v) = patch.history_retention_days { @@ -371,6 +388,29 @@ mod tests { assert!(config.api_key.is_empty()); } + #[test] + fn test_default_dynamic_split_enabled_and_min_remaining() { + let c = AppConfig::default(); + assert!( + c.dynamic_split_enabled, + "PRD §7.1: dynamic split on by default" + ); + assert_eq!(c.dynamic_split_min_remaining_mb, 4); + } + + #[test] + fn test_apply_patch_updates_dynamic_split_fields() { + let mut config = AppConfig::default(); + let patch = ConfigPatch { + dynamic_split_enabled: Some(false), + dynamic_split_min_remaining_mb: Some(16), + ..Default::default() + }; + apply_patch(&mut config, &patch); + assert!(!config.dynamic_split_enabled); + assert_eq!(config.dynamic_split_min_remaining_mb, 16); + } + #[test] fn test_normalize_max_concurrent_clamps_zero_to_min() { assert_eq!( diff --git a/src-tauri/src/domain/model/segment.rs b/src-tauri/src/domain/model/segment.rs index 14684b4..fb38e84 100644 --- a/src-tauri/src/domain/model/segment.rs +++ b/src-tauri/src/domain/model/segment.rs @@ -129,6 +129,52 @@ impl Segment { self.downloaded_bytes = bytes; } + /// Split a downloading segment in two: shrink self to `[start, at_byte)` + /// and return a new pending segment covering `[at_byte, original_end)`. + /// + /// `new_id` is supplied by the caller because segment IDs are allocated + /// by the engine's monotonic counter — the domain method must not invent + /// IDs that could collide with engine-assigned ones. + /// + /// Used by the runtime engine to re-balance a slow segment when a faster + /// peer finishes (PRD §7.1 dynamic split). + pub fn split(&mut self, at_byte: u64, new_id: u32) -> Result { + if self.state != SegmentState::Downloading { + return Err(DomainError::ValidationError(format!( + "cannot split segment in state {:?}", + self.state + ))); + } + if new_id == self.id { + return Err(DomainError::ValidationError(format!( + "split id {new_id} collides with original segment id" + ))); + } + let current_offset = self.start_byte + self.downloaded_bytes; + if at_byte <= current_offset { + return Err(DomainError::ValidationError(format!( + "split point {at_byte} must be strictly above current offset {current_offset}" + ))); + } + if at_byte >= self.end_byte { + return Err(DomainError::ValidationError(format!( + "split point {at_byte} must be strictly below end_byte {}", + self.end_byte + ))); + } + let upper = Segment { + id: new_id, + download_id: self.download_id, + start_byte: at_byte, + end_byte: self.end_byte, + downloaded_bytes: 0, + state: SegmentState::Pending, + retry_count: 0, + }; + self.end_byte = at_byte; + Ok(upper) + } + // --- Getters --- pub fn id(&self) -> u32 { @@ -301,6 +347,74 @@ mod tests { ); } + #[test] + fn test_segment_split_returns_upper_half_and_shrinks_self() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + s.start().unwrap(); + s.update_progress(200); + let upper = s.split(600, 42).unwrap(); + // self keeps lower half + assert_eq!(s.start_byte(), 0); + assert_eq!(s.end_byte(), 600); + assert_eq!(s.downloaded_bytes(), 200); + assert_eq!(s.state(), SegmentState::Downloading); + // upper covers [600, 1000), pending, with caller-provided id + assert_eq!(upper.start_byte(), 600); + assert_eq!(upper.end_byte(), 1000); + assert_eq!(upper.downloaded_bytes(), 0); + assert_eq!(upper.state(), SegmentState::Pending); + assert_eq!(upper.download_id(), DownloadId(10)); + assert_eq!(upper.id(), 42); + } + + #[test] + fn test_segment_split_rejects_at_or_below_current_offset() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + s.start().unwrap(); + s.update_progress(200); + assert!(matches!( + s.split(200, 42), + Err(DomainError::ValidationError(_)) + )); + assert!(matches!( + s.split(100, 42), + Err(DomainError::ValidationError(_)) + )); + } + + #[test] + fn test_segment_split_rejects_at_or_above_end_byte() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + s.start().unwrap(); + assert!(matches!( + s.split(1000, 42), + Err(DomainError::ValidationError(_)) + )); + assert!(matches!( + s.split(1500, 42), + Err(DomainError::ValidationError(_)) + )); + } + + #[test] + fn test_segment_split_rejects_when_not_downloading() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + assert!(matches!( + s.split(500, 42), + Err(DomainError::ValidationError(_)) + )); + } + + #[test] + fn test_segment_split_rejects_id_collision_with_self() { + let mut s = Segment::new(7, DownloadId(10), 0, 1000); + s.start().unwrap(); + assert!(matches!( + s.split(500, 7), + Err(DomainError::ValidationError(_)) + )); + } + #[test] fn test_segment_validates_byte_range() { // equal bytes is valid (zero-length segment) diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index f5a80c8..7c74fcb 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -211,12 +211,29 @@ pub fn run() { let plugin_loader: Arc = plugin_loader_impl.clone(); // ── Download engine ───────────────────────────────────── - let download_engine: Arc = Arc::new(SegmentedDownloadEngine::new( - reqwest_client, - file_storage.clone(), - event_bus.clone(), - 4, - )); + let initial_engine_config = config_store + .get_config() + .unwrap_or_else(|_| crate::domain::model::config::AppConfig::default()); + let segmented_engine = Arc::new( + SegmentedDownloadEngine::new( + reqwest_client, + file_storage.clone(), + event_bus.clone(), + 4, + ) + .with_dynamic_split( + initial_engine_config.dynamic_split_enabled, + initial_engine_config.dynamic_split_min_remaining_mb, + ), + ); + // Keep settings → engine bridge alive so UI changes to + // dynamic_split_* propagate without a restart. + application::services::subscribe_engine_to_config( + event_bus.as_ref(), + config_store.clone(), + segmented_engine.clone(), + ); + let download_engine: Arc = segmented_engine; // ── Startup recovery ──────────────────────────────────── // Orphaned downloads (Downloading/Waiting/Checking/Extracting diff --git a/src/components/__tests__/ClipboardIndicator.test.tsx b/src/components/__tests__/ClipboardIndicator.test.tsx index 67eec5e..a296032 100644 --- a/src/components/__tests__/ClipboardIndicator.test.tsx +++ b/src/components/__tests__/ClipboardIndicator.test.tsx @@ -30,6 +30,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", diff --git a/src/hooks/__tests__/useAppEffects.test.ts b/src/hooks/__tests__/useAppEffects.test.ts index 67198ae..bb2965a 100644 --- a/src/hooks/__tests__/useAppEffects.test.ts +++ b/src/hooks/__tests__/useAppEffects.test.ts @@ -27,6 +27,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: 'none', proxyUrl: null, userAgent: 'Vortex/1.0', diff --git a/src/layouts/__tests__/AppLayout.test.tsx b/src/layouts/__tests__/AppLayout.test.tsx index 4877248..d7055b8 100644 --- a/src/layouts/__tests__/AppLayout.test.tsx +++ b/src/layouts/__tests__/AppLayout.test.tsx @@ -30,6 +30,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", diff --git a/src/stores/__tests__/settingsStore.test.ts b/src/stores/__tests__/settingsStore.test.ts index 56ca684..882423f 100644 --- a/src/stores/__tests__/settingsStore.test.ts +++ b/src/stores/__tests__/settingsStore.test.ts @@ -28,6 +28,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: 'none', proxyUrl: null, userAgent: 'Vortex/1.0', diff --git a/src/types/settings.ts b/src/types/settings.ts index ae74e76..9c6e790 100644 --- a/src/types/settings.ts +++ b/src/types/settings.ts @@ -28,6 +28,8 @@ export interface AppConfig { retryDelaySeconds: number; verifyChecksums: boolean; preAllocateSpace: boolean; + dynamicSplitEnabled: boolean; + dynamicSplitMinRemainingMb: number; // History historyRetentionDays: number; diff --git a/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx b/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx index 6247e39..57202e5 100644 --- a/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx +++ b/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx @@ -28,6 +28,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", diff --git a/src/views/SettingsView/__tests__/Sections.test.tsx b/src/views/SettingsView/__tests__/Sections.test.tsx index dcc612e..c73577b 100644 --- a/src/views/SettingsView/__tests__/Sections.test.tsx +++ b/src/views/SettingsView/__tests__/Sections.test.tsx @@ -44,6 +44,8 @@ const mockConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: true, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, historyRetentionDays: 30, proxyType: "none", proxyUrl: null, diff --git a/src/views/SettingsView/__tests__/SettingsView.test.tsx b/src/views/SettingsView/__tests__/SettingsView.test.tsx index d98b17f..23d8144 100644 --- a/src/views/SettingsView/__tests__/SettingsView.test.tsx +++ b/src/views/SettingsView/__tests__/SettingsView.test.tsx @@ -33,6 +33,8 @@ const mockConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: true, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0",