diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index e50bd09f..e128d87e 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,3 +1,7 @@
+### 4.14.0
+
+* Added `AsyncSeq.mapAsyncParallelThrottled` — ordered, bounded-concurrency parallel map. Like `mapAsyncParallel` but limits the number of in-flight operations to `parallelism`, preventing unbounded resource use on large or infinite sequences.
+
### 4.13.0
* CI: Upgrade Fable from 4.25.0 to 5.0.0-rc.7 and .NET SDK from 8.0.19 to 10.0.100 to fix a CI hang where the Fable build step ran for 6+ hours with .NET 10. Fable 5 + .NET 10 compiles in ~20 seconds.
diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
index 4c9e5b18..946e9bf9 100644
--- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
+++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
@@ -1159,6 +1159,28 @@ module AsyncSeq =
| Choice1Of2 value -> return value
| Choice2Of2 ex -> return raise ex })
}
+
+ let mapAsyncParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
+ use mb = MailboxProcessor.Start (fun _ -> async.Return())
+ use sm = new SemaphoreSlim(parallelism)
+ let! err =
+ s
+ |> iterAsync (fun a -> async {
+ do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
+ let! b = Async.StartChild (async {
+ try
+ let! result = f a
+ sm.Release() |> ignore
+ return result
+ with ex ->
+ sm.Release() |> ignore
+ return raise ex })
+ mb.Post (Some b) })
+ |> Async.map (fun _ -> mb.Post None)
+ |> Async.StartChildAsTask
+ yield!
+ replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
+ |> mapAsync id }
#endif
let chooseAsync f (source:AsyncSeq<'T>) =
diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
index a8ecd49b..98a83981 100644
--- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
+++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
@@ -887,6 +887,15 @@ module AsyncSeq =
/// in the order they complete (unordered), without preserving the original order.
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
+ /// Builds a new asynchronous sequence whose elements are generated by
+ /// applying the specified function to all elements of the input sequence,
+ /// with at most parallelism mapping operations running concurrently.
+ ///
+ /// The function is applied to elements in order and results are emitted in order,
+ /// but in parallel, with at most parallelism operations running concurrently.
+ /// This is the throttled counterpart to mapAsyncParallel.
+ val mapAsyncParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
+
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
/// and async sequences containing elements corresponding to the key.
///
diff --git a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
index 00853d8e..90597602 100644
--- a/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
+++ b/src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
@@ -24,7 +24,7 @@
-
+
diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
index 1f7a34c3..bbd08426 100644
--- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
+++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
@@ -1985,6 +1985,53 @@ let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
Assert.AreEqual(50, result.Length)
+[]
+let ``AsyncSeq.mapAsyncParallelThrottled should maintain order`` () =
+ let ls = List.init 100 id
+ let result =
+ ls
+ |> AsyncSeq.ofList
+ |> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
+ do! Async.Sleep (100 - i)
+ return i * 2 })
+ |> AsyncSeq.toListAsync
+ |> Async.RunSynchronously
+ Assert.AreEqual(ls |> List.map ((*) 2), result)
+
+[]
+let ``AsyncSeq.mapAsyncParallelThrottled should propagate exception`` () =
+ let result =
+ AsyncSeq.init 50L id
+ |> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
+ if i = 25L then return failwith "test error"
+ return i })
+ |> AsyncSeq.toListAsync
+ |> Async.Catch
+ |> Async.RunSynchronously
+ match result with
+ | Choice2Of2 _ -> ()
+ | Choice1Of2 _ -> Assert.Fail("Expected exception")
+
+[]
+let ``AsyncSeq.mapAsyncParallelThrottled should throttle`` () =
+ let count = ref 0
+ let parallelism = 5
+
+ let result =
+ AsyncSeq.init 50L id
+ |> AsyncSeq.mapAsyncParallelThrottled parallelism (fun i -> async {
+ let c = Interlocked.Increment count
+ if c > parallelism then
+ return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
+ do! Async.Sleep 5
+ Interlocked.Decrement count |> ignore
+ return i * 2L })
+ |> AsyncSeq.toListAsync
+ |> Async.RunSynchronously
+
+ Assert.AreEqual(50, result.Length)
+ Assert.AreEqual([ 0L..49L ] |> List.map ((*) 2L), result)
+
//[]
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
// let ls = List.init 500 id
diff --git a/version.props b/version.props
index 7f9a2112..30ed08f9 100644
--- a/version.props
+++ b/version.props
@@ -1,5 +1,5 @@
- 4.12.0
+ 4.14.0