Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggest to add a chunkBy function (to group and yield adjacent items sharing the same key) to the library #156

Open
natalie-o-perret opened this issue Sep 22, 2022 · 4 comments

Comments

@natalie-o-perret
Copy link

Description

Bumped into this at work the other day, and thought this could be a nice addition to the library (unless it's already part of it under another name which wouldn't be too surprising since I'm pretty good at missing out the very obvious).

Basically the idea would be to add a chunkBy function to the AsyncSeq module that allows to group and yield adjacent items sharing the same key in the gist of what has been done for Seq.chunkBy in the F#+ library here, i.e.:

open System
open System.Threading

open FSharpPlus

open FSharp.Control


[<RequireQualifiedAccess>]
module AsyncSeq =
    
    let chunkBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) = asyncSeq {
        use e = source.GetEnumerator()
        let mutable currentMaybe = None
        let! firstCurrentMaybe = e.MoveNext()
        currentMaybe <- firstCurrentMaybe
        if currentMaybe.IsSome then
            let mutable g = projection currentMaybe.Value
            let mutable members = ResizeArray()
            members.Add currentMaybe.Value
            let! preWhileCurrentMaybe = e.MoveNext()
            currentMaybe <- preWhileCurrentMaybe
            while currentMaybe.IsSome do
                let key = projection currentMaybe.Value
                if g = key then
                    members.Add currentMaybe.Value
                else
                    yield (g, members)
                    g <- key
                    members <- ResizeArray ()
                    members.Add currentMaybe.Value
                let! whileCurrentMaybe = e.MoveNext()
                currentMaybe <- whileCurrentMaybe
            yield (g, members)
    }

let chunkBy (projection: 'T -> 'Key) (source: _ seq) = seq {
    use e = source.GetEnumerator ()
    if e.MoveNext () then
        let mutable g = projection e.Current
        let mutable members = ResizeArray ()
        members.Add e.Current
        while e.MoveNext () do
            let key = projection e.Current
            if g = key then members.Add e.Current
            else
                yield g, members
                g <- key
                members <- ResizeArray ()
                members.Add e.Current
        yield g, members }

A naive impl., ahem translation of above I ended up with:

let chunkBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) = asyncSeq {
    use e = source.GetEnumerator()
    let mutable currentMaybe = None
    let! firstCurrentMaybe = e.MoveNext()
    currentMaybe <- firstCurrentMaybe
    if currentMaybe.IsSome then
        let mutable g = projection currentMaybe.Value
        let mutable members = ResizeArray()
        members.Add currentMaybe.Value
        let! preWhileCurrentMaybe = e.MoveNext()
        currentMaybe <- preWhileCurrentMaybe
        while currentMaybe.IsSome do
            let key = projection currentMaybe.Value
            if g = key then
                members.Add currentMaybe.Value
            else
                yield (g, members)
                g <- key
                members <- ResizeArray ()
                members.Add currentMaybe.Value
            let! whileCurrentMaybe = e.MoveNext()
            currentMaybe <- whileCurrentMaybe
        yield (g, members)
}

I've checked a bunch of Seq more functional approaches which seem to require more allocations: https://stackoverflow.com/a/38495042/4636721

And here is a working example:

[<EntryPoint>]
let main _ =
    
    let generateCpiEntities() = 
        let alphabet = [| 'a' .. 'z' |]
        seq { 0 .. 13 }
        |> Seq.map (fun x -> Thread.Sleep 500; FakeCpiEntity.Of(x / 3, [ Array.get alphabet x ]))
    
    let pushGroupsToGrpc = String.join "," >> sprintf "Push %A" >> String.replace "\n" String.Empty >> printfn "%s"
    let grpcEntityToSendThreshold = 2
    
    generateCpiEntities()                           
    |> Seq.chunkBy     FakeCpiEntity.ToPrimaryKey
    |> Seq.map         FakeGrpcEntity.OfCpiEntities
    |> Seq.chunkBySize grpcEntityToSendThreshold
    |> Seq.iter        pushGroupsToGrpc

    generateCpiEntities()
    |> AsyncSeq.ofSeq
    |> AsyncSeq.chunkBy     FakeCpiEntity.ToPrimaryKey
    |> AsyncSeq.map         FakeGrpcEntity.OfCpiEntities
    |> AsyncSeq.chunkBySize grpcEntityToSendThreshold
    |> AsyncSeq.iter        pushGroupsToGrpc
    |> Async.RunSynchronously
    
    0

Sample output:

Push "{ Id = 0  Items = ['a'; 'b'; 'c'] },{ Id = 1  Items = ['d'; 'e'; 'f'] }"
Push "{ Id = 2  Items = ['g'; 'h'; 'i'] },{ Id = 3  Items = ['j'; 'k'; 'l'] }"
Push "{ Id = 4  Items = ['m'; 'n'] }"
Push "{ Id = 0  Items = ['a'; 'b'; 'c'] },{ Id = 1  Items = ['d'; 'e'; 'f'] }"
Push "{ Id = 2  Items = ['g'; 'h'; 'i'] },{ Id = 3  Items = ['j'; 'k'; 'l'] }"
Push "{ Id = 4  Items = ['m'; 'n'] }"

(Also, I know [random] vertical alignment is dumb but I can't help it)

Wdyt?
Do you think this feature is worthy-enough🔨⚡ (or just relevant 🤔) to be part of the library?

@dsyme
Copy link
Contributor

dsyme commented Oct 11, 2022

@natalie-o-perret Looks good, please consider making a PR, thanks

@abelbraaksma
Copy link
Member

You may want to take a close look at what happens if multiple threads are iterating with the resulting sequence. I think the code can cause race conditions. There are functions in this library that specifically warn about that, so maybe it’s enough to just say so in the doc comments.

I’m currently considering adding the same to TaskSeq, then came across this.

@natalie-o-perret
Copy link
Author

You may want to take a close look at what happens if multiple threads are iterating with the resulting sequence. I think the code can cause race conditions. There are functions in this library that specifically warn about that, so maybe it’s enough to just say so in the doc comments.

I’m currently considering adding the same to TaskSeq, then came across this.

This is a good point 🤔.
Tbs, it's also the case with the seq then, innit?
What about the other functions of the AsyncSeq module?
Are they all safe regarding the race conditions when multiple threads are iterating the resulting sequence?

@abelbraaksma
Copy link
Member

abelbraaksma commented Nov 12, 2022

@natalie-o-perret I’m not sure about seq, I’d have to check. But I think the premise there is that you get a new enumerator, but that you shouldn’t share this enumerator between threads.

My concern with AsyncSeq is that there’s a much larger chance it’s used in multi threaded scenarios. Not entirely sure what the base premise is with sharing enumerators. While that’s generally code smell, I think this lib ought to be safe in that regard, but I’d have to check.

@dsyme is there a general idea/consensus in the dotnet ecosystem that sequences should be thread safe, but enumerators of these sequences are not (or: aren’t guaranteed to be)?

It waa my understanding that most collections are safe for reading, but not safe for writing. The thing is, in these functions like chunkBy or cache, we’re doing both, but the user probably considers it a read operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants