Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request new function #31

Open
xkrt opened this issue Jun 4, 2015 · 4 comments
Open

Request new function #31

xkrt opened this issue Jun 4, 2015 · 4 comments

Comments

@xkrt
Copy link

xkrt commented Jun 4, 2015

Hi!

In my work I sometimes need a function that takes asyncs and perform them in parallel, but with simultaneously performed asyncs count lesser than count of overall asyncs passed to function, and return results of asyncs as AsyncSeq in order to consumer can process the results as soon as they appear.

Maybe my explanation confusing, I will try express it in the code:

let getJobs () = asyncSeq {
    // some jobs retrieve code here
    return [1;2;3;4] |> AsyncSeq.ofSeq
}

let download job = async {
    // some download code here
    return 42
}

let downloadAll parallelCount = asyncSeq {
    let jobs = getJobs() // overall jobs count far greater than parallelCount
    let results = jobs |> AsyncSeq.requestedFunc parallelCount
    return results
}

// consumer code
let maxParallel = 5
downloadAll maxParallel
|> AsyncSeq.toBlockingSeq
|> Seq.iter (fun result -> printfn "%A" result)

Here is my attempt to do so:

let downloadAllBad parallelCount = asyncSeq {
    let jobs = getJobs()
    let batches = jobs |> AsyncSeq.bufferByCount parallelCount

    for batch in batches do
        let! results =
            batch
            |> Seq.map download
            |> Async.Parallel       // bad: wait time of results = wait time of longer async in batch
        for result in results do
            yield result
}

There I forced to wait results of all asyncs in batch, but I want to recieve results as them appears.

In past I use MailboxProcessor's for this, one agent distrubutes tasks to a few worker agents.

@dsyme
Copy link
Contributor

dsyme commented Jun 4, 2015

How about AsyncSeq.mergeAll?

@eulerfx
Copy link
Contributor

eulerfx commented Sep 4, 2015

The following should work, but isn't ideal:

    let parallelize (parallelism:int) (s:seq<Async<'a>>) : Async<seq<'a>> = async {              
          let buffer = new Collections.Concurrent.BlockingCollection<_>(parallelism)
          let cts = new CancellationTokenSource()
          try
            do! Async.ParallelIgnore cts.Token parallelism (s |> Seq.map (Async.map buffer.Add))
          finally
            buffer.CompleteAdding()          
          return seq {
            use buffer = buffer   
            use cts = cts
            use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
            yield! buffer.GetConsumingEnumerable(cts.Token) } }

This will run up to parallelism computation in parallel, and will start emitting results as soon as they are available. One problem here is that the consumer of the AsyncSeq has little control of the consumption of the source sequence. Ideally, we would have something that allowed the consumer of the resulting AsyncSeq trigger the next batch to start, but still allow results to yield incrementally. This should possible to do by baking the Async.ParallelIgnore loop directly into this function. Once parallelism computations have completed, the outer AsyncSeq stops consuming the source sequence until it is pulled again.

Implementation of Async.ParallelIgnore.

        static member ParallelIgnore (ct:CancellationToken) (parallelism:int) (xs:seq<Async<_>>) = async {                
          let sm = new SemaphoreSlim(parallelism)
          let cde = new CountdownEvent(1)
          let tcs = new TaskCompletionSource<unit>()          
          ct.Register(Action(fun () -> tcs.TrySetCanceled() |> ignore)) |> ignore
          let inline tryComplete () =
            if cde.Signal() then
              tcs.SetResult(())
          let inline ok _ =
            sm.Release() |> ignore
            tryComplete ()                
          let inline err (ex:exn) =
            tcs.TrySetException ex |> ignore
            sm.Release() |> ignore                        
          let inline cnc (ex:OperationCanceledException) =      
            tcs.TrySetCanceled() |> ignore
            sm.Release() |> ignore
          try
            use en = xs.GetEnumerator()
            while not (tcs.Task.IsCompleted) && en.MoveNext() do
              sm.Wait()
              cde.AddCount(1)
              Async.StartWithContinuations(en.Current, ok, err, cnc, ct)                              
            tryComplete ()
            do! tcs.Task |> Async.AwaitTask
          finally      
            cde.Dispose()    
            sm.Dispose() }

Use of BlockingCollection can be avoided with something like the following; it should allow waiting without blocking:

    type private EagerSeq<'a> = TaskCompletionSource<EagerCons<'a>>

    and private EagerCons<'a> = 
      | EagerCons of 'a * EagerSeq<'a>
      | EagerNil

    let rec private ofEager (e:EagerSeq<'a>) : AsyncSeq<'a> = 
      async.Delay <| fun () ->
        e.Task
        |> Async.AwaitTask
        |> Async.map (function
          | EagerNil -> Nil
          | EagerCons(a,tl) -> Cons(a, ofEager tl))

@eulerfx
Copy link
Contributor

eulerfx commented Apr 11, 2016

Hey @xkrt, following up on this and seeing if your request can be addressed. There is a new function in master (not yet released): val mapAsyncParallel : f:('a -> Async<'b>) -> s:AsyncSeq<'a> -> AsyncSeq<'b>. It is like the existing mapAsync however it doesn't wait for a mapping operation to complete before starting the next. The results are placed into a FIFO queue, such that the results are returned in the same order as the inputs. Since order is maintained, head-of-line blocking can result - if the first item takes a long time to process, subsequent items won't be emitted even though they may already be processed. Would this work for your scenario?

Alternatively, we can get functions of this type:

val throttle : parallelism:int -> ('a -> Async<'b>) -> ('a -> Async<'b>)

val mapAsyncParallelUnordered: 
    f:('a -> Async<'b>) -> 
    AsyncSeq<'a> -> 
    AsyncSeq<'b>

This function would be similar to mapAsyncParallel but will emit the first result which is available, regardless of order. Throttling can be applied using a different combinator. It could be built-in, but it seems to be a separate concern.

@jbtule
Copy link
Member

jbtule commented Jul 17, 2019

I needed a mapAsyncParallelUnordered so I took the mapAsyncParallel and modified it for use in an opensource tool. Just for thought I'd share since I came across this issue 4 years later hoping for said function.

https://github.com/ekonbenefits/dotnet-ssllabs-check/blob/98ff194eb4010324224f1e63b042f934d1f9097d/AsyncSeq.fs#L54-L65

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants