Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace use of Task.WaitAny for Task.WhenAny #73

Merged
merged 2 commits into from
Nov 21, 2017

Conversation

johlrich
Copy link
Contributor

@johlrich johlrich commented Nov 6, 2017

Added tests for bufferByTime/bufferByCountAndTime (timing based tests, but should be little risk of false positives from them I think). Could probably be simpler, but this way worked out using the reported repro.

While I focused on those two for the tests since that's what the original issue reported (and bufferByCountAndTime was my use case too), I noticed chooseTasks/2 are being used in quite a few functions, so not sure if there should be more tests added.

Should fix #65

@johlrich
Copy link
Contributor Author

johlrich commented Nov 6, 2017

Will push other commit after CI shows red for failing test

@johlrich
Copy link
Contributor Author

johlrich commented Nov 6, 2017

Travis red, but looks like there are some changes pending to it in another PR.
AppVeyor shows red w. failing test and green after proposed fix though.

@eulerfx
Copy link
Contributor

eulerfx commented Nov 6, 2017

Hey, thx for the PR.

Regarding WaitAny vs WhenAny - I recall that some benchmarks show that the blocking call is faster. I think it depends on the scenario and something I've struggled to find a great solution for. On the one hand, it can be more efficient to have a few threads blocked, rather than incurring the cost of the Async machinery. However, if there are lots of computations like this, then more efficient to go the non-blocking route.

@johlrich
Copy link
Contributor Author

johlrich commented Nov 6, 2017

Any insights into the various scenarios favoring one approach vs another for perf? I can take a deeper look and do some benchmark/profiling to see if we can come up with a single approach.

@eulerfx
Copy link
Contributor

eulerfx commented Nov 21, 2017

Hey, just following up on this. I tested with the following workload:

let N = 100L
let bufferSize = 100
let bufferTime = 1000
let P = 1000

let go n = async {
  return!
    AsyncSeq.init n id
    |> AsyncSeq.mapAsync (fun i -> async {
      do! Async.Sleep 0
      return i })
    |> AsyncSeq.bufferByCountAndTime2 bufferSize bufferTime
    |> AsyncSeq.iter ignore
}

Seq.init P id
|> Seq.map (fun _ -> go N)
|> Async.Parallel
|> Async.RunSynchronously

And your solution does quite a bit better, especially in cases where there is more parallelism. This is expected, since the current solution blocks, increasing contention.

In the following, the first result is the existing solution and the second, your non-blocking solution:

let N = 100000L
let bufferSize = 100
let bufferTime = 1000
let P = 10

//Real: 00:02:03.090, CPU: 00:04:31.515, GC gen0: 917, gen1: 864, gen2: 53
//Real: 00:01:27.512, CPU: 00:03:20.750, GC gen0: 986, gen1: 914, gen2: 59
let N = 10000L
let bufferSize = 100
let bufferTime = 1000
let P = 100

//Real: 00:02:46.513, CPU: 00:03:01.734, GC gen0: 993, gen1: 927, gen2: 66
//Real: 00:00:52.050, CPU: 00:02:03.796, GC gen0: 996, gen1: 917, gen2: 69
let N = 100L
let bufferSize = 100
let bufferTime = 1000
let P = 1000

// ???
//Real: 00:00:04.838, CPU: 00:00:09.859, GC gen0: 103, gen1: 97, gen2: 6

I also tried an alternate implementation:

    static member chooseChoice (a:Async<'a>) (b:Async<'b>) : Async<Choice<'a * Async<'b>, 'b * Async<'a>>> = async {
      let! ct = Async.CancellationToken
      return!
        Async.FromContinuations <| fun (ok,err,cnc) ->
          let state = ref 0
          let resA = TaskCompletionSource<_>()
          let resB = TaskCompletionSource<_>()
          let inline oka a =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then 
              ok (Choice1Of2 (a, resB.Task |> Async.AwaitTask))
            else
              resA.SetResult a
          let inline okb b =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then 
              ok (Choice2Of2 (b, resA.Task |> Async.AwaitTask))
            else
              resB.SetResult b
          let inline err (ex:exn) =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then 
              err ex
          let inline cnc ex =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then
              cnc ex
          Async.startThreadPoolWithContinuations (a, oka, err, cnc, ct)
          Async.startThreadPoolWithContinuations (b, okb, err, cnc, ct) }

That actually does slightly better, but needs more testing with respect to how it handles cancellations.

So, I'm tempted to merge this PR as is.

@eulerfx eulerfx merged commit 6d2dcbe into fsprojects:master Nov 21, 2017
@eulerfx
Copy link
Contributor

eulerfx commented Nov 21, 2017

Released: https://www.nuget.org/packages/FSharp.Control.AsyncSeq/2.0.17

@johlrich johlrich deleted the remove-task-waitall branch November 21, 2017 21:12
@johlrich
Copy link
Contributor Author

Sweet, thanks for following up!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BufferByCountAndTime impact to Async.StartWithContinuations
2 participants