diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 018c417bca1..bc57fe40173 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -7,7 +7,6 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - "github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set" dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag" routing "gx/ipfs/QmSD6bSPcXaaR7LpQHjytLWQD7DrCsb415CWfpbd9Szemb/go-libp2p-routing" @@ -99,7 +98,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er } func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error { - provided := streamingset.NewStreamingSet() + provided := cid.NewStreamingSet() errCh := make(chan error) go func() { diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 27ed43c1498..a99f6a43303 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -5,7 +5,6 @@ import ( pin "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set" merkledag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag" blocks "gx/ipfs/QmYBEfMSquSGnuxBthUoBJNs3F6p4VAPPvAgxq6XXGvTPh/go-ipfs-blockstore" cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid" @@ -44,8 +43,8 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) } } -func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingset.StreamingSet, error) { - set := streamingset.NewStreamingSet() +func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cid.StreamingSet, error) { + set := cid.NewStreamingSet() go func() { ctx, cancel := context.WithCancel(ctx) diff --git a/thirdparty/streaming-cid-set/set.go b/thirdparty/streaming-cid-set/set.go deleted file mode 100644 index 51f09443015..00000000000 --- a/thirdparty/streaming-cid-set/set.go +++ /dev/null @@ -1,38 +0,0 @@ -package streamingset - -import ( - "context" - - cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid" -) - -// StreamingSet is an extension of cid.Set which allows to implement back-pressure -// for the Visit function -type StreamingSet struct { - Set *cid.Set - New chan *cid.Cid -} - -// NewStreamingSet initializes and returns new Set. -func NewStreamingSet() *StreamingSet { - return &StreamingSet{ - Set: cid.NewSet(), - New: make(chan *cid.Cid), - } -} - -// Visitor creates new visitor which adds a Cids to the set and emits them to -// the set.New channel -func (s *StreamingSet) Visitor(ctx context.Context) func(c *cid.Cid) bool { - return func(c *cid.Cid) bool { - if s.Set.Visit(c) { - select { - case s.New <- c: - case <-ctx.Done(): - } - return true - } - - return false - } -}