From ed63defde6943aaf0ac9a33b5a85d28fb4349e1d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 15 Apr 2026 00:24:47 +0000 Subject: [PATCH 1/3] =?UTF-8?q?Add=20AsyncSeq.mapAsyncParallelThrottled=20?= =?UTF-8?q?=E2=80=94=20ordered,=20bounded-concurrency=20parallel=20map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the parallel map API matrix: - mapAsyncParallel (ordered, unbounded) - mapAsyncUnorderedParallel (unordered, unbounded) - mapAsyncUnorderedParallelThrottled (unordered, throttled) - mapAsyncParallelThrottled (ordered, throttled) ← NEW Implementation mirrors mapAsyncParallel but adds a SemaphoreSlim to cap the number of in-flight operations at 'parallelism'. Results are emitted in source order. 3 new tests: order preservation, exception propagation, throttle enforcement. 414/414 tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- RELEASE_NOTES.md | 4 ++ src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 22 +++++++++ src/FSharp.Control.AsyncSeq/AsyncSeq.fsi | 9 ++++ .../AsyncSeqTests.fs | 47 +++++++++++++++++++ version.props | 2 +- 5 files changed, 83 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e50bd09f..e128d87e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,7 @@ +### 4.14.0 + +* Added `AsyncSeq.mapAsyncParallelThrottled` — ordered, bounded-concurrency parallel map. Like `mapAsyncParallel` but limits the number of in-flight operations to `parallelism`, preventing unbounded resource use on large or infinite sequences. + ### 4.13.0 * CI: Upgrade Fable from 4.25.0 to 5.0.0-rc.7 and .NET SDK from 8.0.19 to 10.0.100 to fix a CI hang where the Fable build step ran for 6+ hours with .NET 10. Fable 5 + .NET 10 compiles in ~20 seconds. diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 4c9e5b18..946e9bf9 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1159,6 +1159,28 @@ module AsyncSeq = | Choice1Of2 value -> return value | Choice2Of2 ex -> return raise ex }) } + + let mapAsyncParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq { + use mb = MailboxProcessor.Start (fun _ -> async.Return()) + use sm = new SemaphoreSlim(parallelism) + let! err = + s + |> iterAsync (fun a -> async { + do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError + let! b = Async.StartChild (async { + try + let! result = f a + sm.Release() |> ignore + return result + with ex -> + sm.Release() |> ignore + return raise ex }) + mb.Post (Some b) }) + |> Async.map (fun _ -> mb.Post None) + |> Async.StartChildAsTask + yield! + replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive)) + |> mapAsync id } #endif let chooseAsync f (source:AsyncSeq<'T>) = diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index a8ecd49b..98a83981 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -887,6 +887,15 @@ module AsyncSeq = /// in the order they complete (unordered), without preserving the original order. val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U> + /// Builds a new asynchronous sequence whose elements are generated by + /// applying the specified function to all elements of the input sequence, + /// with at most parallelism mapping operations running concurrently. + /// + /// The function is applied to elements in order and results are emitted in order, + /// but in parallel, with at most parallelism operations running concurrently. + /// This is the throttled counterpart to mapAsyncParallel. + val mapAsyncParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U> + /// Applies a key-generating function to each element and returns an async sequence containing unique keys /// and async sequences containing elements corresponding to the key. /// diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 1f7a34c3..bbd08426 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -1985,6 +1985,53 @@ let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () = Assert.AreEqual(50, result.Length) +[] +let ``AsyncSeq.mapAsyncParallelThrottled should maintain order`` () = + let ls = List.init 100 id + let result = + ls + |> AsyncSeq.ofList + |> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async { + do! Async.Sleep (100 - i) + return i * 2 }) + |> AsyncSeq.toListAsync + |> Async.RunSynchronously + Assert.AreEqual(ls |> List.map ((*) 2), result) + +[] +let ``AsyncSeq.mapAsyncParallelThrottled should propagate exception`` () = + let result = + AsyncSeq.init 50L id + |> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async { + if i = 25L then return failwith "test error" + return i }) + |> AsyncSeq.toListAsync + |> Async.Catch + |> Async.RunSynchronously + match result with + | Choice2Of2 _ -> () + | Choice1Of2 _ -> Assert.Fail("Expected exception") + +[] +let ``AsyncSeq.mapAsyncParallelThrottled should throttle`` () = + let count = ref 0 + let parallelism = 5 + + let result = + AsyncSeq.init 50L id + |> AsyncSeq.mapAsyncParallelThrottled parallelism (fun i -> async { + let c = Interlocked.Increment count + if c > parallelism then + return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism) + do! Async.Sleep 5 + Interlocked.Decrement count |> ignore + return i * 2L }) + |> AsyncSeq.toListAsync + |> Async.RunSynchronously + + Assert.AreEqual(50, result.Length) + Assert.AreEqual([ 0L..49L ] |> List.map ((*) 2L), result) + //[] //let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () = // let ls = List.init 500 id diff --git a/version.props b/version.props index 7f9a2112..30ed08f9 100644 --- a/version.props +++ b/version.props @@ -1,5 +1,5 @@ - 4.12.0 + 4.14.0 From 53d086709a5fe118e4bd39d597392b26f32030ff Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 15 Apr 2026 00:24:50 +0000 Subject: [PATCH 2/3] ci: trigger checks From 146bbf80816eb287892b9655accbff978e81b1ce Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 17 Apr 2026 07:58:49 +0000 Subject: [PATCH 3/3] fix: bump Microsoft.Bcl.AsyncInterfaces to 10.0.6 to resolve Fable CI NU1605 downgrade warning System.Threading.Channels 10.0.6 (resolved via Version="*") requires Microsoft.Bcl.AsyncInterfaces >= 10.0.6, but the project pinned it at 10.0.5. Fable's MSBuildCrackerResolver treats NU1605 (package downgrade) as a fatal error, causing the Fable CI job to fail. Bumping to 10.0.6 resolves the conflict. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj index 00853d8e..90597602 100644 --- a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj +++ b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj @@ -24,7 +24,7 @@ - +