Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncSeq.mapAsyncParallel alternatives #155

Open
Xyncgas opened this issue Sep 6, 2022 · 3 comments
Open

AsyncSeq.mapAsyncParallel alternatives #155

Xyncgas opened this issue Sep 6, 2022 · 3 comments

Comments

@Xyncgas
Copy link

Xyncgas commented Sep 6, 2022

AsyncSeq.mapAsyncParallel returns a sequence with order preserved, however we often want to read the first thing that's available and throw away the rest

I propose a generic version AsyncSeq.mapAsyncUnorderedParallel that returns a sequence in the order of first available

@Xyncgas
Copy link
Author

Xyncgas commented Sep 6, 2022

@dsyme
@martinmoec
@eulerfx

@Xyncgas
Copy link
Author

Xyncgas commented Sep 6, 2022

Here's the code

module ChannelExtention
    open System.Threading
    open System.Threading.Channels
    open FSharp.Control
    
    let CreateCompleted () =
        let C = Channel.CreateUnbounded()
        C.Writer.Complete()
        C

    open LanguagePrimitives
    let inline private AsyncRangeProduce (dataChannel:Channel<'a>) producer range =
        match range with
        | AbsPosRangeByLength (pos, length) ->
            async{
                let End = pos + length - GenericOne
                let mutable i = pos;
                let Mutex = new Mutex();
                let mutable ExecutedTasks = GenericZero;
                while i<=End do
                    let lc_i = i
                    async{
                        try
                            let! value = producer(lc_i)
                            let! _ = dataChannel.Writer.WriteAsync(value).AsTask() |> Async.AwaitTask
                            ()
                        finally
                            Mutex.WaitOne() |> ignore
                            ExecutedTasks <- ExecutedTasks + GenericOne
                            Mutex.ReleaseMutex()
                            if ExecutedTasks = length then dataChannel.Writer.Complete()
                    } |> Async.Start
                    i <- i + GenericOne
            } |> Async.Start
            dataChannel.Reader.ReadAllAsync() |> toAsyncSeq
        | _ -> CreateCompleted().Reader.ReadAllAsync() |> toAsyncSeq

    let inline private AsyncSequenceProduce (dataChannel:Channel<'a>) producer source =
        async{
            let Mutex = new Mutex();
            let mutable Completed = GenericZero;
            let mutable Schedueled = GenericZero;
            let mutable HasElement = false;
            source
            |> AsyncSeq.iter (fun x->
                HasElement <- true
                Schedueled <- Checked.(+) Schedueled GenericOne
                let lc_x = x
                async{
                    try
                        let! value = producer(lc_x)
                        let! _ = dataChannel.Writer.WriteAsync(value).AsTask() |> Async.AwaitTask
                        ()
                    finally
                        Mutex.WaitOne() |> ignore
                        Completed <- Completed + GenericOne
                        Mutex.ReleaseMutex()
                        if Completed = Schedueled then dataChannel.Writer.Complete()
                } |> Async.Start)
            |> Async.RunSynchronously
            if not HasElement then dataChannel.Writer.Complete()
        } |> Async.Start
        dataChannel.Reader.ReadAllAsync() |> toAsyncSeq

    let inline writeRangeToBounded capacity = AsyncRangeProduce (CreateChannelHelper.CreateBounded(capacity)) 
    
    let inline writeRangeToUnbounded() = AsyncRangeProduce (CreateChannelHelper.CreateUnbounded())

    let inline writeSequenceToBounded capacity = AsyncSequenceProduce (CreateChannelHelper.CreateBounded(capacity)) 

    let inline writeSequenceToUnbounded () = AsyncSequenceProduce (CreateChannelHelper.CreateUnbounded())
//Takes a function, AsyncSeq<'a>, returns AsyncSeq<'b> with first availability

@dsyme
Copy link
Contributor

dsyme commented Sep 7, 2022

@Xyncgas Seems reasonable!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants