From 581e43b377e820838553b6025fc507b87f7ef19b Mon Sep 17 00:00:00 2001 From: Lev Gorodinski Date: Wed, 27 Sep 2017 09:40:47 -0400 Subject: [PATCH 1/2] update assembly version --- src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs | 8 ++++---- src/FSharp.Control.AsyncSeq/AssemblyInfo.fs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs b/src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs index 272fcf5..6cae419 100644 --- a/src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs +++ b/src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs @@ -4,10 +4,10 @@ open System.Reflection [] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = - let [] Version = "2.0.13" - let [] InformationalVersion = "2.0.13" + let [] Version = "2.0.14" + let [] InformationalVersion = "2.0.14" diff --git a/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs b/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs index 0f07277..5aac1c1 100644 --- a/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs +++ b/src/FSharp.Control.AsyncSeq/AssemblyInfo.fs @@ -4,10 +4,10 @@ open System.Reflection [] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = - let [] Version = "2.0.13" - let [] InformationalVersion = "2.0.13" + let [] Version = "2.0.14" + let [] InformationalVersion = "2.0.14" From 49aab215f41d3e321bf352828ff357a570f4e959 Mon Sep 17 00:00:00 2001 From: Lev Gorodinski Date: Wed, 27 Sep 2017 14:24:49 -0400 Subject: [PATCH 2/2] bufferByTime --- RELEASE_NOTES.md | 3 ++ src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 40 +++++++++++++++++++ src/FSharp.Control.AsyncSeq/AsyncSeq.fsi | 4 ++ .../AsyncSeqTests.fs | 23 +++++++++++ 4 files changed, 70 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1f5a078..a42d01d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +### 2.0.15 - 27.09.2017 +* NEW: AsyncSeq.bufferByTime + ### 2.0.14 - 27.09.2017 * BUG: Fixed head of line blocking in AsyncSeq.mapAsyncParallel diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 1246448..221548b 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -110,6 +110,14 @@ module internal Utils = elif i = 1 then return (Choice2Of2 (b.Result, a)) else return! failwith (sprintf "unreachable, i = %d" i) } + static member internal chooseTasks2 (a:Task<'T>) (b:Task) : Async>> = + async { + let! ct = Async.CancellationToken + let i = Task.WaitAny( [| (a :> Task);(b) |],ct) + if i = 0 then return (Choice1Of2 (a.Result, b)) + elif i = 1 then return (Choice2Of2 (a)) + else return! failwith (sprintf "unreachable, i = %d" i) } + type MailboxProcessor<'Msg> with member __.PostAndAsyncReplyTask (f:TaskCompletionSource<'a> -> 'Msg) : Task<'a> = let tcs = new TaskCompletionSource<'a>() @@ -131,6 +139,9 @@ module internal Utils = let chooseTask (t:Task<'a>) (a:Async<'a>) : Async<'a> = chooseTaskAsTask t a |> Async.bind Async.awaitTaskCancellationAsError + let toUnit (t:Task) : Task = + t.ContinueWith (Func<_, _>(fun (_:Task) -> ())) + let taskFault (t:Task<'a>) : Task<'b> = t |> extend (fun t -> @@ -1389,6 +1400,35 @@ module AsyncSeq = yield! loop None timeoutMs } + let bufferByTime (timeMs:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = asyncSeq { + if (timeMs < 1) then invalidArg "timeMs" "must be positive" + let buf = new ResizeArray<_>() + use ie = source.GetEnumerator() + let rec loop (next:Task<'T option> option, waitFor:Task option) = asyncSeq { + let! next = + match next with + | Some n -> async.Return n + | None -> ie.MoveNext () |> Async.StartChildAsTask + let waitFor = + match waitFor with + | Some w -> w + | None -> Task.Delay timeMs + let! res = Async.chooseTasks2 next waitFor + match res with + | Choice1Of2 (Some a,waitFor) -> + buf.Add a + yield! loop (None,Some waitFor) + | Choice1Of2 (None,_) -> + let arr = buf.ToArray() + if arr.Length > 0 then + yield arr + | Choice2Of2 next -> + let arr = buf.ToArray() + buf.Clear() + yield arr + yield! loop (Some next, None) } + yield! loop (None, None) } + let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq> = asyncSeq { let! move1T = Async.StartChildAsTask (ie1.MoveNext()) let! move2T = Async.StartChildAsTask (ie2.MoveNext()) diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 87766d7..d1cfb69 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -421,6 +421,10 @@ module AsyncSeq = /// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed. val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []> + /// Buffers items from the async sequence by the specified time interval. + /// If no items are received in an intervel and empty array is emitted. + val bufferByTime : timeMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T[]> + /// Merges two async sequences into an async sequence non-deterministically. /// The resulting async sequence produces elements when any argument sequence produces an element. val mergeChoice: source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 7aaf996..e0ef8a2 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -446,6 +446,29 @@ let ``AsyncSeq.bufferByTimeAndCount empty``() = let actual = AsyncSeq.bufferByCountAndTime 2 10 s |> AsyncSeq.toList Assert.True((actual = [])) +[] +let ``AsyncSeq.bufferByTime`` () = + + let s = asyncSeq { + yield 1 + yield 2 + do! Async.Sleep 100 + yield 3 + yield 4 + do! Async.Sleep 100 + yield 5 + yield 6 + } + + let actual = + s + |> AsyncSeq.bufferByTime 100 + |> AsyncSeq.map (List.ofArray) + |> AsyncSeq.toList + + let expected = [ [1;2] ; [3;4] ; [5;6] ] + + Assert.True ((actual = expected)) [] let ``try finally works no exception``() =