Skip to content

Commit

Permalink
add a streaming CID set
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k authored and Stebalien committed Aug 11, 2018
1 parent 99995a3 commit 4de263a
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions set.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package cid

import (
"context"
)

// Set is a implementation of a set of Cids, that is, a structure
// to which holds a single copy of every Cids that is added to it.
type Set struct {
Expand Down Expand Up @@ -65,3 +69,34 @@ func (s *Set) ForEach(f func(c *Cid) error) error {
}
return nil
}

// StreamingSet is an extension of Set which allows to implement back-pressure
// for the Visit function
type StreamingSet struct {
Set *Set
New chan *Cid
}

// NewStreamingSet initializes and returns new Set.
func NewStreamingSet() *StreamingSet {
return &StreamingSet{
Set: NewSet(),
New: make(chan *Cid),
}
}

// Visitor creates new visitor which adds a Cids to the set and emits them to
// the set.New channel
func (s *StreamingSet) Visitor(ctx context.Context) func(c *Cid) bool {
return func(c *Cid) bool {
if s.Set.Visit(c) {
select {
case s.New <- c:
case <-ctx.Done():
}
return true
}

return false
}
}

0 comments on commit 4de263a

Please sign in to comment.