From ed63defde6943aaf0ac9a33b5a85d28fb4349e1d Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
<41898282+github-actions[bot]@users.noreply.github.com>
Date: Wed, 15 Apr 2026 00:24:47 +0000
Subject: [PATCH 1/3] =?UTF-8?q?Add=20AsyncSeq.mapAsyncParallelThrottled=20?=
=?UTF-8?q?=E2=80=94=20ordered,=20bounded-concurrency=20parallel=20map?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Completes the parallel map API matrix:
- mapAsyncParallel (ordered, unbounded)
- mapAsyncUnorderedParallel (unordered, unbounded)
- mapAsyncUnorderedParallelThrottled (unordered, throttled)
- mapAsyncParallelThrottled (ordered, throttled) ← NEW
Implementation mirrors mapAsyncParallel but adds a SemaphoreSlim
to cap the number of in-flight operations at 'parallelism'. Results
are emitted in source order.
3 new tests: order preservation, exception propagation, throttle enforcement.
414/414 tests pass.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
RELEASE_NOTES.md | 4 ++
src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 22 +++++++++
src/FSharp.Control.AsyncSeq/AsyncSeq.fsi | 9 ++++
.../AsyncSeqTests.fs | 47 +++++++++++++++++++
version.props | 2 +-
5 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index e50bd09f..e128d87e 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,3 +1,7 @@
+### 4.14.0
+
+* Added `AsyncSeq.mapAsyncParallelThrottled` — ordered, bounded-concurrency parallel map. Like `mapAsyncParallel` but limits the number of in-flight operations to `parallelism`, preventing unbounded resource use on large or infinite sequences.
+
### 4.13.0
* CI: Upgrade Fable from 4.25.0 to 5.0.0-rc.7 and .NET SDK from 8.0.19 to 10.0.100 to fix a CI hang where the Fable build step ran for 6+ hours with .NET 10. Fable 5 + .NET 10 compiles in ~20 seconds.
diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
index 4c9e5b18..946e9bf9 100644
--- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
+++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
@@ -1159,6 +1159,28 @@ module AsyncSeq =
| Choice1Of2 value -> return value
| Choice2Of2 ex -> return raise ex })
}
+
+ let mapAsyncParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
+ use mb = MailboxProcessor.Start (fun _ -> async.Return())
+ use sm = new SemaphoreSlim(parallelism)
+ let! err =
+ s
+ |> iterAsync (fun a -> async {
+ do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
+ let! b = Async.StartChild (async {
+ try
+ let! result = f a
+ sm.Release() |> ignore
+ return result
+ with ex ->
+ sm.Release() |> ignore
+ return raise ex })
+ mb.Post (Some b) })
+ |> Async.map (fun _ -> mb.Post None)
+ |> Async.StartChildAsTask
+ yield!
+ replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
+ |> mapAsync id }
#endif
let chooseAsync f (source:AsyncSeq<'T>) =
diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
index a8ecd49b..98a83981 100644
--- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
+++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
@@ -887,6 +887,15 @@ module AsyncSeq =
/// in the order they complete (unordered), without preserving the original order.
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
+ /// Builds a new asynchronous sequence whose elements are generated by
+ /// applying the specified function to all elements of the input sequence,
+ /// with at most parallelism mapping operations running concurrently.
+ ///
+ /// The function is applied to elements in order and results are emitted in order,
+ /// but in parallel, with at most parallelism operations running concurrently.
+ /// This is the throttled counterpart to mapAsyncParallel.
+ val mapAsyncParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
+
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
/// and async sequences containing elements corresponding to the key.
///
diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
index 1f7a34c3..bbd08426 100644
--- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
+++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
@@ -1985,6 +1985,53 @@ let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
Assert.AreEqual(50, result.Length)
+[]
+let ``AsyncSeq.mapAsyncParallelThrottled should maintain order`` () =
+ let ls = List.init 100 id
+ let result =
+ ls
+ |> AsyncSeq.ofList
+ |> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
+ do! Async.Sleep (100 - i)
+ return i * 2 })
+ |> AsyncSeq.toListAsync
+ |> Async.RunSynchronously
+ Assert.AreEqual(ls |> List.map ((*) 2), result)
+
+[]
+let ``AsyncSeq.mapAsyncParallelThrottled should propagate exception`` () =
+ let result =
+ AsyncSeq.init 50L id
+ |> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
+ if i = 25L then return failwith "test error"
+ return i })
+ |> AsyncSeq.toListAsync
+ |> Async.Catch
+ |> Async.RunSynchronously
+ match result with
+ | Choice2Of2 _ -> ()
+ | Choice1Of2 _ -> Assert.Fail("Expected exception")
+
+[]
+let ``AsyncSeq.mapAsyncParallelThrottled should throttle`` () =
+ let count = ref 0
+ let parallelism = 5
+
+ let result =
+ AsyncSeq.init 50L id
+ |> AsyncSeq.mapAsyncParallelThrottled parallelism (fun i -> async {
+ let c = Interlocked.Increment count
+ if c > parallelism then
+ return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
+ do! Async.Sleep 5
+ Interlocked.Decrement count |> ignore
+ return i * 2L })
+ |> AsyncSeq.toListAsync
+ |> Async.RunSynchronously
+
+ Assert.AreEqual(50, result.Length)
+ Assert.AreEqual([ 0L..49L ] |> List.map ((*) 2L), result)
+
//[]
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
// let ls = List.init 500 id
diff --git a/version.props b/version.props
index 7f9a2112..30ed08f9 100644
--- a/version.props
+++ b/version.props
@@ -1,5 +1,5 @@
- 4.12.0
+ 4.14.0
From 53d086709a5fe118e4bd39d597392b26f32030ff Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
Date: Wed, 15 Apr 2026 00:24:50 +0000
Subject: [PATCH 2/3] ci: trigger checks
From 146bbf80816eb287892b9655accbff978e81b1ce Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
<41898282+github-actions[bot]@users.noreply.github.com>
Date: Fri, 17 Apr 2026 07:58:49 +0000
Subject: [PATCH 3/3] fix: bump Microsoft.Bcl.AsyncInterfaces to 10.0.6 to
resolve Fable CI NU1605 downgrade warning
System.Threading.Channels 10.0.6 (resolved via Version="*") requires
Microsoft.Bcl.AsyncInterfaces >= 10.0.6, but the project pinned it at 10.0.5.
Fable's MSBuildCrackerResolver treats NU1605 (package downgrade) as a fatal
error, causing the Fable CI job to fail. Bumping to 10.0.6 resolves the conflict.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
index 00853d8e..90597602 100644
--- a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
+++ b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
@@ -24,7 +24,7 @@
-
+