diff --git a/set.go b/set.go index b801ade..1096649 100644 --- a/set.go +++ b/set.go @@ -1,5 +1,9 @@ package cid +import ( + "context" +) + // Set is a implementation of a set of Cids, that is, a structure // to which holds a single copy of every Cids that is added to it. type Set struct { @@ -65,3 +69,34 @@ func (s *Set) ForEach(f func(c *Cid) error) error { } return nil } + +// StreamingSet is an extension of Set which allows to implement back-pressure +// for the Visit function +type StreamingSet struct { + Set *Set + New chan *Cid +} + +// NewStreamingSet initializes and returns new Set. +func NewStreamingSet() *StreamingSet { + return &StreamingSet{ + Set: NewSet(), + New: make(chan *Cid), + } +} + +// Visitor creates new visitor which adds a Cids to the set and emits them to +// the set.New channel +func (s *StreamingSet) Visitor(ctx context.Context) func(c *Cid) bool { + return func(c *Cid) bool { + if s.Set.Visit(c) { + select { + case s.New <- c: + case <-ctx.Done(): + } + return true + } + + return false + } +}