Skip to content

Commit

Permalink
providers: ProvideRecursive
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Jun 25, 2018
1 parent 25ceb80 commit 61d2e94
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
3 changes: 3 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
resolver "github.com/ipfs/go-ipfs/path/resolver"
pin "github.com/ipfs/go-ipfs/pin"
providers "github.com/ipfs/go-ipfs/providers"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/verifbs"
Expand Down Expand Up @@ -237,7 +238,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}
} else {
n.SetupOfflineRouting()
n.Exchange = offline.Exchange(n.Blockstore)
n.Providers = providers.NewProviders(n.ctx, n.Routing, nil)
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
Expand Down
8 changes: 7 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ You can now check what blocks have been created by:
}

// copy intermediary nodes from editor to our actual dagservice
_, err := fileAdder.Finalize()
nd, err := fileAdder.Finalize()
if err != nil {
return err
}
Expand All @@ -322,6 +322,12 @@ You can now check what blocks have been created by:
return nil
}

if !local {
if err := n.Providers.ProvideRecursive(req.Context, nd, dserv); err != nil {
return err
}
}

return fileAdder.PinRoot()
}

Expand Down
41 changes: 35 additions & 6 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ import (
"context"
"time"

host "gx/ipfs/QmQQGtcp6nVUrQjNsnU53YWV1q8fK1Kd9S7FEkYbRZzxry/go-libp2p-host"
flags "gx/ipfs/QmRMGdC6HKdLsPDABL9aXPDidrpmEHzJqFWSvshkbn9Hj8/go-ipfs-flags"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log"
routing "gx/ipfs/QmUV9hDAAyjeGbxbXkJ2sYqZ6dTd1DXJ2REhYEkRm178Tg/go-libp2p-routing"
peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
pstore "gx/ipfs/QmZhsmorLpD9kmQ4ynbAu4vbKv2goMUnXazwGA4gnWHDjB/go-libp2p-peerstore"
host "gx/ipfs/QmQQGtcp6nVUrQjNsnU53YWV1q8fK1Kd9S7FEkYbRZzxry/go-libp2p-host"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log"
)

const (
provideTimeout = time.Second * 15

// maxProvidersPerRequest specifies the maximum number of providers desired
// MaxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
// results.
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
Expand Down Expand Up @@ -46,8 +47,10 @@ type blockRequest struct {

// Interface is an definition of providers interface to libp2p routing system
type Interface interface {
Provide(*cid.Cid) error
FindProviders(ctx context.Context, c *cid.Cid) error
Provide(k *cid.Cid) error
ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error

FindProviders(ctx context.Context, k *cid.Cid) error
FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID

Stat() (*Stat, error)
Expand Down Expand Up @@ -119,6 +122,29 @@ func (p *providers) Provide(b *cid.Cid) error {
return nil
}

func (p *providers) provideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter, done *cid.Set) error {
p.Provide(n.Cid())

for _, l := range n.Links() {
if !done.Visit(l.Cid) {
continue
}

sub, err := l.GetNode(ctx, serv)
if err != nil {
return err
}
if err := p.provideRecursive(ctx, sub, serv, done); err != nil {
return err
}
}
return nil
}

func (p *providers) ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error {
return p.provideRecursive(ctx, n, serv, cid.NewSet())
}

func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error {
select {
case <-ctx.Done():
Expand All @@ -130,6 +156,9 @@ func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error {

// FindProvidersAsync returns a channel of providers for the given key
func (p *providers) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
if p.host == nil {
return nil
}

// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
Expand Down
11 changes: 9 additions & 2 deletions providers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ import (

process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log"
peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
pstore "gx/ipfs/QmZhsmorLpD9kmQ4ynbAu4vbKv2goMUnXazwGA4gnWHDjB/go-libp2p-peerstore"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log"
)

func (p *providers) startWorkers(ctx context.Context, px process.Process) {
if p.host != nil {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
p.providerQueryManager(ctx)
})
}

// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
p.provideCollector(ctx)
Expand Down

0 comments on commit 61d2e94

Please sign in to comment.