diff --git a/core/builder.go b/core/builder.go index 4de4595cd02..4a04551c35b 100644 --- a/core/builder.go +++ b/core/builder.go @@ -14,6 +14,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" resolver "github.com/ipfs/go-ipfs/path/resolver" pin "github.com/ipfs/go-ipfs/pin" + providers "github.com/ipfs/go-ipfs/providers" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/verifbs" @@ -237,7 +238,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } } else { + n.SetupOfflineRouting() n.Exchange = offline.Exchange(n.Blockstore) + n.Providers = providers.NewProviders(n.ctx, n.Routing, nil) } n.Blocks = bserv.New(n.Blockstore, n.Exchange) diff --git a/core/commands/add.go b/core/commands/add.go index 1fee487934b..3d19d0ed84b 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -313,7 +313,7 @@ You can now check what blocks have been created by: } // copy intermediary nodes from editor to our actual dagservice - _, err := fileAdder.Finalize() + nd, err := fileAdder.Finalize() if err != nil { return err } @@ -322,6 +322,12 @@ You can now check what blocks have been created by: return nil } + if !local { + if err := n.Providers.ProvideRecursive(req.Context, nd, dserv); err != nil { + return err + } + } + return fileAdder.PinRoot() } diff --git a/providers/providers.go b/providers/providers.go index d84f57653cc..b79021bfa44 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -4,21 +4,22 @@ import ( "context" "time" + host "gx/ipfs/QmQQGtcp6nVUrQjNsnU53YWV1q8fK1Kd9S7FEkYbRZzxry/go-libp2p-host" flags "gx/ipfs/QmRMGdC6HKdLsPDABL9aXPDidrpmEHzJqFWSvshkbn9Hj8/go-ipfs-flags" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" - logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" routing "gx/ipfs/QmUV9hDAAyjeGbxbXkJ2sYqZ6dTd1DXJ2REhYEkRm178Tg/go-libp2p-routing" peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer" - cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" + ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" pstore "gx/ipfs/QmZhsmorLpD9kmQ4ynbAu4vbKv2goMUnXazwGA4gnWHDjB/go-libp2p-peerstore" - host "gx/ipfs/QmQQGtcp6nVUrQjNsnU53YWV1q8fK1Kd9S7FEkYbRZzxry/go-libp2p-host" + cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" + logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" ) const ( provideTimeout = time.Second * 15 - // maxProvidersPerRequest specifies the maximum number of providers desired + // MaxProvidersPerRequest specifies the maximum number of providers desired // from the network. This value is specified because the network streams // results. // TODO: if a 'non-nice' strategy is implemented, consider increasing this value @@ -46,8 +47,10 @@ type blockRequest struct { // Interface is an definition of providers interface to libp2p routing system type Interface interface { - Provide(*cid.Cid) error - FindProviders(ctx context.Context, c *cid.Cid) error + Provide(k *cid.Cid) error + ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error + + FindProviders(ctx context.Context, k *cid.Cid) error FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID Stat() (*Stat, error) @@ -119,6 +122,29 @@ func (p *providers) Provide(b *cid.Cid) error { return nil } +func (p *providers) provideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter, done *cid.Set) error { + p.Provide(n.Cid()) + + for _, l := range n.Links() { + if !done.Visit(l.Cid) { + continue + } + + sub, err := l.GetNode(ctx, serv) + if err != nil { + return err + } + if err := p.provideRecursive(ctx, sub, serv, done); err != nil { + return err + } + } + return nil +} + +func (p *providers) ProvideRecursive(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) error { + return p.provideRecursive(ctx, n, serv, cid.NewSet()) +} + func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error { select { case <-ctx.Done(): @@ -130,6 +156,9 @@ func (p *providers) FindProviders(ctx context.Context, c *cid.Cid) error { // FindProvidersAsync returns a channel of providers for the given key func (p *providers) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { + if p.host == nil { + return nil + } // Since routing queries are expensive, give bitswap the peers to which we // have open connections. Note that this may cause issues if bitswap starts diff --git a/providers/workers.go b/providers/workers.go index 90b425e0864..a421bf5ebcd 100644 --- a/providers/workers.go +++ b/providers/workers.go @@ -6,13 +6,20 @@ import ( process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" - logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer" - cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" pstore "gx/ipfs/QmZhsmorLpD9kmQ4ynbAu4vbKv2goMUnXazwGA4gnWHDjB/go-libp2p-peerstore" + cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" + logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log" ) func (p *providers) startWorkers(ctx context.Context, px process.Process) { + if p.host != nil { + // Start up a worker to handle block requests this node is making + px.Go(func(px process.Process) { + p.providerQueryManager(ctx) + }) + } + // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { p.provideCollector(ctx)