Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 4.14.0

* Added `AsyncSeq.mapAsyncParallelThrottled` β€” ordered, bounded-concurrency parallel map. Like `mapAsyncParallel` but limits the number of in-flight operations to `parallelism`, preventing unbounded resource use on large or infinite sequences.

### 4.13.0

* CI: Upgrade Fable from 4.25.0 to 5.0.0-rc.7 and .NET SDK from 8.0.19 to 10.0.100 to fix a CI hang where the Fable build step ran for 6+ hours with .NET 10. Fable 5 + .NET 10 compiles in ~20 seconds.
Expand Down
22 changes: 22 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,28 @@
| Choice1Of2 value -> return value
| Choice2Of2 ex -> return raise ex })
}

let mapAsyncParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
use mb = MailboxProcessor.Start (fun _ -> async.Return())
use sm = new SemaphoreSlim(parallelism)
let! err =
s
|> iterAsync (fun a -> async {
do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
let! b = Async.StartChild (async {
try
let! result = f a
sm.Release() |> ignore
return result
with ex ->
sm.Release() |> ignore
return raise ex })
mb.Post (Some b) })
|> Async.map (fun _ -> mb.Post None)
|> Async.StartChildAsTask
yield!
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
|> mapAsync id }
#endif

let chooseAsync f (source:AsyncSeq<'T>) =
Expand Down Expand Up @@ -2703,7 +2725,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2728 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2728 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
9 changes: 9 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,15 @@ module AsyncSeq =
/// in the order they complete (unordered), without preserving the original order.
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>

/// Builds a new asynchronous sequence whose elements are generated by
/// applying the specified function to all elements of the input sequence,
/// with at most <c>parallelism</c> mapping operations running concurrently.
///
/// The function is applied to elements in order and results are emitted in order,
/// but in parallel, with at most <c>parallelism</c> operations running concurrently.
/// This is the throttled counterpart to <c>mapAsyncParallel</c>.
val mapAsyncParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>

/// Applies a key-generating function to each element and returns an async sequence containing unique keys
/// and async sequences containing elements corresponding to the key.
///
Expand Down
2 changes: 1 addition & 1 deletion src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.5" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.6" />
<PackageReference Include="System.Threading.Channels" Version="*" />
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
</ItemGroup>
Expand Down
47 changes: 47 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,53 @@

Assert.AreEqual(50, result.Length)

[<Test>]
let ``AsyncSeq.mapAsyncParallelThrottled should maintain order`` () =
let ls = List.init 100 id
let result =
ls
|> AsyncSeq.ofList
|> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
do! Async.Sleep (100 - i)
return i * 2 })
|> AsyncSeq.toListAsync
|> Async.RunSynchronously
Assert.AreEqual(ls |> List.map ((*) 2), result)

[<Test>]
let ``AsyncSeq.mapAsyncParallelThrottled should propagate exception`` () =
let result =
AsyncSeq.init 50L id
|> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
if i = 25L then return failwith "test error"
return i })
|> AsyncSeq.toListAsync
|> Async.Catch
|> Async.RunSynchronously
match result with
| Choice2Of2 _ -> ()
| Choice1Of2 _ -> Assert.Fail("Expected exception")

[<Test>]
let ``AsyncSeq.mapAsyncParallelThrottled should throttle`` () =
let count = ref 0
let parallelism = 5

let result =
AsyncSeq.init 50L id
|> AsyncSeq.mapAsyncParallelThrottled parallelism (fun i -> async {
let c = Interlocked.Increment count
if c > parallelism then
return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
do! Async.Sleep 5
Interlocked.Decrement count |> ignore
return i * 2L })
|> AsyncSeq.toListAsync
|> Async.RunSynchronously

Assert.AreEqual(50, result.Length)
Assert.AreEqual([ 0L..49L ] |> List.map ((*) 2L), result)

//[<Test>]
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
// let ls = List.init 500 id
Expand Down Expand Up @@ -2047,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2056,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -4407,7 +4454,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4457 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4420,7 +4467,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4470 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4429,7 +4476,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4479 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
2 changes: 1 addition & 1 deletion version.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<PropertyGroup>
<Version>4.12.0</Version>
<Version>4.14.0</Version>
</PropertyGroup>
</Project>
Loading