Skip to content

Commit

Permalink
move streaming set to thirdparty
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k authored and Stebalien committed Aug 15, 2018
1 parent 0fe9ab9 commit 274f308
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 45 deletions.
20 changes: 7 additions & 13 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,20 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error {
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()

dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))

err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visit)
if err != nil {
return err
}
}

for _, k := range kset.Keys() {
if provided.Has(k) {
continue
}

err = r.Provide(ctx, k, true)
if err != nil {
return err
}
provided.Add(k)
for _, k := range provided.Keys() {
err := r.Provide(ctx, k, true)
if err != nil {
return err
}
provided.Add(k)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/interface/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// DhtAPI specifies the interface to the DHT
// Note: This API will likely get renamed in near future, see
// Note: This API will likely get deprecated in near future, see
// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context.
type DhtAPI interface {
// FindPeer queries the DHT for all of the multiaddresses associated with a
Expand Down
41 changes: 10 additions & 31 deletions exchange/reprovide/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

pin "github.com/ipfs/go-ipfs/pin"

"github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set"
merkledag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
blocks "gx/ipfs/QmYBEfMSquSGnuxBthUoBJNs3F6p4VAPPvAgxq6XXGvTPh/go-ipfs-blockstore"
cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
Expand All @@ -29,7 +30,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool)
outCh := make(chan *cid.Cid)
go func() {
defer close(outCh)
for c := range set.new {
for c := range set.New {
select {
case <-ctx.Done():
return
Expand All @@ -43,21 +44,23 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool)
}
}

func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingSet, error) {
set := newStreamingSet()
func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingset.StreamingSet, error) {
set := streamingset.NewStreamingSet()

go func() {
defer close(set.new)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(set.New)

for _, key := range pinning.DirectKeys() {
set.add(key)
set.Visitor(ctx)(key)
}

for _, key := range pinning.RecursiveKeys() {
set.add(key)
set.Visitor(ctx)(key)

if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.add)
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx))
if err != nil {
log.Errorf("reprovide indirect pins: %s", err)
return
Expand All @@ -68,27 +71,3 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRo

return set, nil
}

type streamingSet struct {
set *cid.Set
new chan *cid.Cid
}

// NewSet initializes and returns a new Set.
func newStreamingSet() *streamingSet {
return &streamingSet{
set: cid.NewSet(),
new: make(chan *cid.Cid),
}
}

// add adds a Cid to the set only if it is
// not in it already.
func (s *streamingSet) add(c *cid.Cid) bool {
if s.set.Visit(c) {
s.new <- c
return true
}

return false
}
38 changes: 38 additions & 0 deletions thirdparty/streaming-cid-set/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package streamingset

import (
"context"

cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
)

// StreamingSet is an extension of cid.Set which allows to implement back-pressure
// for the Visit function
type StreamingSet struct {
Set *cid.Set
New chan *cid.Cid
}

// NewStreamingSet initializes and returns new Set.
func NewStreamingSet() *StreamingSet {
return &StreamingSet{
Set: cid.NewSet(),
New: make(chan *cid.Cid),
}
}

// Visitor creates new visitor which adds a Cids to the set and emits them to
// the set.New channel
func (s *StreamingSet) Visitor(ctx context.Context) func(c *cid.Cid) bool {
return func(c *cid.Cid) bool {
if s.Set.Visit(c) {
select {
case s.New <- c:
case <-ctx.Done():
}
return true
}

return false
}
}

0 comments on commit 274f308

Please sign in to comment.