From cb92db323b42eda517adb10c7cd7839f44deeb50 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Wed, 17 Apr 2019 10:08:08 -0700 Subject: [PATCH] Introduce first strategic provider: do nothing License: MIT Signed-off-by: Michael Avila --- core/builder.go | 5 +- core/commands/bitswap.go | 2 +- core/core.go | 4 +- core/coreapi/coreapi.go | 2 +- core/node/core.go | 4 +- core/node/groups.go | 58 +---- core/node/provider.go | 115 +++++++-- go.mod | 4 +- go.sum | 8 + provider/offline.go | 20 +- provider/provider.go | 70 +----- provider/{ => queue}/queue.go | 5 +- provider/{ => queue}/queue_test.go | 5 +- provider/simple/provider.go | 72 ++++++ provider/{ => simple}/provider_test.go | 14 +- provider/simple/reprovide.go | 225 ++++++++++++++++++ .../simple}/reprovide_test.go | 11 +- provider/system.go | 47 ++++ reprovide/providers.go | 75 ------ reprovide/reprovide.go | 159 ------------- test/sharness/lib/iptb-lib.sh | 21 ++ test/sharness/t0175-provider.sh | 34 +++ test/sharness/t0175-reprovider.sh | 21 -- test/sharness/t0175-strategic-provider.sh | 34 +++ 24 files changed, 609 insertions(+), 406 deletions(-) rename provider/{ => queue}/queue.go (98%) rename provider/{ => queue}/queue_test.go (96%) create mode 100644 provider/simple/provider.go rename provider/{ => simple}/provider_test.go (86%) create mode 100644 provider/simple/reprovide.go rename {reprovide => provider/simple}/reprovide_test.go (85%) create mode 100644 provider/system.go delete mode 100644 reprovide/providers.go delete mode 100644 reprovide/reprovide.go create mode 100755 test/sharness/t0175-provider.sh create mode 100755 test/sharness/t0175-strategic-provider.sh diff --git a/core/builder.go b/core/builder.go index 60cf2cb2336..e40adc5c429 100644 --- a/core/builder.go +++ b/core/builder.go @@ -3,11 +3,10 @@ package core import ( "context" - "github.com/ipfs/go-metrics-interface" - "go.uber.org/fx" - "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/node" + "github.com/ipfs/go-metrics-interface" + "go.uber.org/fx" ) type BuildCfg = node.BuildCfg // Alias for compatibility until we properly refactor the constructor interface diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index f322f4d90f1..1dd7a534ca0 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -234,7 +234,7 @@ Trigger reprovider to announce our data to network. return ErrNotOnline } - err = nd.Reprovider.Trigger(req.Context) + err = nd.Provider.Reprovide(req.Context) if err != nil { return err } diff --git a/core/core.go b/core/core.go index d7ededb8224..c50104ba61b 100644 --- a/core/core.go +++ b/core/core.go @@ -27,7 +27,6 @@ import ( "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" - rp "github.com/ipfs/go-ipfs/reprovide" bserv "github.com/ipfs/go-blockservice" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -94,8 +93,7 @@ type IpfsNode struct { Routing routing.IpfsRouting `optional:"true"` // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes - Provider provider.Provider // the value provider system - Reprovider *rp.Reprovider `optional:"true"` // the value reprovider system + Provider provider.System // the value provider system IpnsRepub *ipnsrp.Republisher `optional:"true"` AutoNAT *autonat.AutoNATService `optional:"true"` diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index eaf870ec808..dbbeba1fecd 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -68,7 +68,7 @@ type CoreAPI struct { namesys namesys.NameSystem routing routing.IpfsRouting - provider provider.Provider + provider provider.System pubSub *pubsub.PubSub diff --git a/core/node/core.go b/core/node/core.go index d7f3ba97ea8..8283447d2bb 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/ipfs/go-ipfs-config" "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/repo" @@ -59,8 +60,9 @@ func Dag(bs blockservice.BlockService) format.DAGService { } // OnlineExchange creates new LibP2P backed block exchange (BitSwap) -func OnlineExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface { +func OnlineExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface { bitswapNetwork := network.NewFromIpfsHost(host, rt) + bitswap.ProvideEnabled = !cfg.Experimental.StrategicProviding exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { diff --git a/core/node/groups.go b/core/node/groups.go index 4473f79e83d..2394d38e589 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -14,12 +14,10 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/ipfs/go-ipfs/core/node/libp2p" - "github.com/ipfs/go-ipfs/p2p" - "github.com/ipfs/go-ipfs/provider" - "github.com/ipfs/go-ipfs/reprovide" offline "github.com/ipfs/go-ipfs-exchange-offline" offroute "github.com/ipfs/go-ipfs-routing/offline" + "github.com/ipfs/go-ipfs/p2p" "github.com/ipfs/go-path/resolver" uio "github.com/ipfs/go-unixfs/io" "go.uber.org/fx" @@ -188,42 +186,6 @@ var IPNS = fx.Options( fx.Provide(RecordValidator), ) -// Providers groups units managing provider routing records -func Providers(cfg *config.Config) fx.Option { - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return fx.Error(err) - } - - reproviderInterval = dur - } - - var keyProvider fx.Option - switch cfg.Reprovider.Strategy { - case "all": - fallthrough - case "": - keyProvider = fx.Provide(reprovide.NewBlockstoreProvider) - case "roots": - keyProvider = fx.Provide(reprovide.NewPinnedProvider(true)) - case "pinned": - keyProvider = fx.Provide(reprovide.NewPinnedProvider(false)) - default: - return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)) - } - - return fx.Options( - fx.Provide(ProviderQueue), - fx.Provide(ProviderCtor), - fx.Provide(ReproviderCtor(reproviderInterval)), - keyProvider, - - fx.Invoke(Reprovider), - ) -} - // Online groups online-only units func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { @@ -272,17 +234,19 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(p2p.New), LibP2P(bcfg, cfg), - Providers(cfg), + OnlineProviders(cfg), ) } // Offline groups offline alternatives to Online units -var Offline = fx.Options( - fx.Provide(offline.Exchange), - fx.Provide(Namesys(0)), - fx.Provide(offroute.NewOfflineRouter), - fx.Provide(provider.NewOfflineProvider), -) +func Offline(cfg *config.Config) fx.Option { + return fx.Options( + fx.Provide(offline.Exchange), + fx.Provide(Namesys(0)), + fx.Provide(offroute.NewOfflineRouter), + OfflineProviders(cfg), + ) +} // Core groups basic IPFS services var Core = fx.Options( @@ -297,7 +261,7 @@ func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option { if bcfg.Online { return Online(bcfg, cfg) } - return Offline + return Offline(cfg) } // IPFS builds a group of fx Options based on the passed BuildCfg diff --git a/core/node/provider.go b/core/node/provider.go index e85a909141d..a5c62baa68b 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -2,50 +2,129 @@ package node import ( "context" + "fmt" "time" - "github.com/libp2p/go-libp2p-routing" "go.uber.org/fx" + "github.com/ipfs/go-ipfs-config" "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/ipfs/go-ipfs/provider" + q "github.com/ipfs/go-ipfs/provider/queue" + "github.com/ipfs/go-ipfs/provider/simple" "github.com/ipfs/go-ipfs/repo" - "github.com/ipfs/go-ipfs/reprovide" + "github.com/libp2p/go-libp2p-routing" ) const kReprovideFrequency = time.Hour * 12 +// SIMPLE + // ProviderQueue creates new datastore backed provider queue -func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { - return provider.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) +func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) { + return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) } -// ProviderCtor creates new record provider -func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { - p := provider.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) +// SimpleProviderCtor creates new record provider +func SimpleProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt routing.IpfsRouting) provider.Provider { + return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) +} + +// SimpleReproviderCtor creates new reprovider +func SimpleReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, simple.KeyChanFunc) (provider.Reprovider, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) { + return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil + } +} +// SimpleProviderSysCtor creates new provider system +func SimpleProviderSysCtor(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System { + sys := provider.NewSystem(p, r) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - p.Run() + sys.Run() return nil }, OnStop: func(ctx context.Context) error { - return p.Close() + return sys.Close() }, }) + return sys +} + +// SimpleOfflineProviderSysCtor creates a new offline provider system +func SimpleOfflineProviderSysCtor(p provider.Provider, r provider.Reprovider) provider.System { + return provider.NewSystem(p, r) +} + +// STRATEGIC + +// StrategicProviderSysCtor creates new provider system +func StrategicProviderSysCtor() provider.System { + return provider.NewOfflineProvider() +} + +// StrategicOfflineProviderSysCtor creates a new offline provider system +func StrategicOfflineProviderSysCtor() provider.System { + return provider.NewOfflineProvider() +} + +// ONLINE/OFFLINE + +// OnlineProviders groups units managing provider routing records online +func OnlineProviders(cfg *config.Config) fx.Option { + if cfg.Experimental.StrategicProviding { + return fx.Provide(StrategicProviderSysCtor) + } - return p + return fx.Options( + SimpleProviders(cfg), + fx.Provide(SimpleProviderSysCtor), + ) } -// ReproviderCtor creates new reprovider -func ReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, reprovide.KeyChanFunc) (*reprovide.Reprovider, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider reprovide.KeyChanFunc) (*reprovide.Reprovider, error) { - return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil +// OfflineProviders groups units managing provider routing records offline +func OfflineProviders(cfg *config.Config) fx.Option { + if cfg.Experimental.StrategicProviding { + return fx.Provide(StrategicOfflineProviderSysCtor) } + + return fx.Options( + SimpleProviders(cfg), + fx.Provide(SimpleOfflineProviderSysCtor), + ) } -// Reprovider runs the reprovider service -func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error { - lp.Append(reprovider.Run) - return nil +// SimpleProviders creates the simple provider/reprovider dependencies +func SimpleProviders(cfg *config.Config) fx.Option { + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return fx.Error(err) + } + + reproviderInterval = dur + } + + var keyProvider fx.Option + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = fx.Provide(simple.NewBlockstoreProvider) + case "roots": + keyProvider = fx.Provide(simple.NewPinnedProvider(true)) + case "pinned": + keyProvider = fx.Provide(simple.NewPinnedProvider(false)) + default: + return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)) + } + + return fx.Options( + fx.Provide(ProviderQueue), + fx.Provide(SimpleProviderCtor), + keyProvider, + fx.Provide(SimpleReproviderCtor(reproviderInterval)), + ) } diff --git a/go.mod b/go.mod index ef738a66a72..0163b41e7b4 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/hashicorp/golang-lru v0.5.1 github.com/hsanjuan/go-libp2p-http v0.0.2 github.com/ipfs/dir-index-html v1.0.3 - github.com/ipfs/go-bitswap v0.0.4 + github.com/ipfs/go-bitswap v0.0.5 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.0.3 github.com/ipfs/go-cid v0.0.1 @@ -34,7 +34,7 @@ require ( github.com/ipfs/go-ipfs-chunker v0.0.1 github.com/ipfs/go-ipfs-cmdkit v0.0.1 github.com/ipfs/go-ipfs-cmds v0.0.5 - github.com/ipfs/go-ipfs-config v0.0.3 + github.com/ipfs/go-ipfs-config v0.0.4-0.20190502164316-b06d585017ae github.com/ipfs/go-ipfs-ds-help v0.0.1 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 diff --git a/go.sum b/go.sum index f34b89bf48d..36eecf365c5 100644 --- a/go.sum +++ b/go.sum @@ -138,6 +138,10 @@ github.com/ipfs/go-bitswap v0.0.3 h1:uFcSI9dkjUn67S7IM60vr2wA27aAvn8o9xYjaQCug3o github.com/ipfs/go-bitswap v0.0.3/go.mod h1:jadAZYsP/tcRMl47ZhFxhaNuDQoXawT8iHMg+iFoQbg= github.com/ipfs/go-bitswap v0.0.4 h1:mrS8jBd+rCgKw7Owx4RM5QBiMi9DBc1Ih9FaEBYM4/M= github.com/ipfs/go-bitswap v0.0.4/go.mod h1:jadAZYsP/tcRMl47ZhFxhaNuDQoXawT8iHMg+iFoQbg= +github.com/ipfs/go-bitswap v0.0.5-0.20190430213314-d1f829bed810 h1:bZlGYwSXQ4wiewl39r2w/3GjSE/CwzKoiXVb4UeQ6hE= +github.com/ipfs/go-bitswap v0.0.5-0.20190430213314-d1f829bed810/go.mod h1:fcVV/eiSBkhfkqOf+v0WjA+fZVYJV6NKZRgue2B6b34= +github.com/ipfs/go-bitswap v0.0.5 h1:ccnSWMn5CwWH/8zU7UZugFeJ/ZWH2AzexKmozx+7BG8= +github.com/ipfs/go-bitswap v0.0.5/go.mod h1:fcVV/eiSBkhfkqOf+v0WjA+fZVYJV6NKZRgue2B6b34= github.com/ipfs/go-block-format v0.0.1/go.mod h1:DK/YYcsSUIVAFNwo/KZCdIIbpN0ROH/baNLgayt4pFc= github.com/ipfs/go-block-format v0.0.2 h1:qPDvcP19izTjU8rgo6p7gTXZlkMkF5bz5G3fqIsSCPE= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= @@ -184,8 +188,12 @@ github.com/ipfs/go-ipfs-cmds v0.0.5 h1:+blTEnA0MzkQO86WnpfGnchdojrY5wJLhsbby3/JX github.com/ipfs/go-ipfs-cmds v0.0.5/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk= github.com/ipfs/go-ipfs-config v0.0.1 h1:6ED08emzI1imdsAjixFi2pEyZxTVD5ECKtCOxLBx+Uc= github.com/ipfs/go-ipfs-config v0.0.1/go.mod h1:KDbHjNyg4e6LLQSQpkgQMBz6Jf4LXiWAcmnkcwmH0DU= +github.com/ipfs/go-ipfs-config v0.0.3-0.20190422204848-4aaf5baeceef h1:KEFD4DNaAuvrsN5+38l1e+yjomoegKufUtOJbKcN40k= +github.com/ipfs/go-ipfs-config v0.0.3-0.20190422204848-4aaf5baeceef/go.mod h1:KDbHjNyg4e6LLQSQpkgQMBz6Jf4LXiWAcmnkcwmH0DU= github.com/ipfs/go-ipfs-config v0.0.3 h1:Ep4tRdP1iVK76BgOprD9B/qtOEdpno+1Xb57BqydgGk= github.com/ipfs/go-ipfs-config v0.0.3/go.mod h1:KDbHjNyg4e6LLQSQpkgQMBz6Jf4LXiWAcmnkcwmH0DU= +github.com/ipfs/go-ipfs-config v0.0.4-0.20190502164316-b06d585017ae h1:2lzUW0VhlY+kUQCFjtBAQoFLNqS1cfWQCUmpbgNGRDI= +github.com/ipfs/go-ipfs-config v0.0.4-0.20190502164316-b06d585017ae/go.mod h1:KDbHjNyg4e6LLQSQpkgQMBz6Jf4LXiWAcmnkcwmH0DU= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= diff --git a/provider/offline.go b/provider/offline.go index 0c91ed2af77..eb1d1b9acda 100644 --- a/provider/offline.go +++ b/provider/offline.go @@ -1,20 +1,28 @@ package provider -import "github.com/ipfs/go-cid" +import ( + "context" + "github.com/ipfs/go-cid" +) type offlineProvider struct{} -// NewOfflineProvider creates a Provider that does nothing -func NewOfflineProvider() Provider { +// NewOfflineProvider creates a ProviderSystem that does nothing +func NewOfflineProvider() System { return &offlineProvider{} } -func (op *offlineProvider) Run() {} +func (op *offlineProvider) Run() { +} -func (op *offlineProvider) Provide(cid cid.Cid) error { +func (op *offlineProvider) Close() error { return nil } -func (op *offlineProvider) Close() error { +func (op *offlineProvider) Provide(_ cid.Cid) error { + return nil +} + +func (op *offlineProvider) Reprovide(_ context.Context) error { return nil } diff --git a/provider/provider.go b/provider/provider.go index 67c5c6b6b92..e8939ba6f7f 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -1,18 +1,18 @@ -// Package provider implements structures and methods to provide blocks, -// keep track of which blocks are provided, and to allow those blocks to -// be reprovided. package provider import ( "context" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-routing" ) -var log = logging.Logger("provider") +var ( + // StrategicProvidingEnabled toggles between the original providing mechanism + // and the new strategic providing system + StrategicProvidingEnabled = false -const provideOutgoingWorkerLimit = 8 + log = logging.Logger("provider") +) // Provider announces blocks to the network type Provider interface { @@ -24,56 +24,10 @@ type Provider interface { Close() error } -type provider struct { - ctx context.Context - // the CIDs for which provide announcements should be made - queue *Queue - // used to announce providing to the network - contentRouting routing.ContentRouting -} - -// NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider { - return &provider{ - ctx: ctx, - queue: queue, - contentRouting: contentRouting, - } -} - -// Close stops the provider -func (p *provider) Close() error { - p.queue.Close() - return nil -} - -// Start workers to handle provide requests. -func (p *provider) Run() { - p.handleAnnouncements() -} - -// Provide the given cid using specified strategy. -func (p *provider) Provide(root cid.Cid) error { - p.queue.Enqueue(root) - return nil -} - -// Handle all outgoing cids by providing (announcing) them -func (p *provider) handleAnnouncements() { - for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { - go func() { - for p.ctx.Err() == nil { - select { - case <-p.ctx.Done(): - return - case c := <-p.queue.Dequeue(): - log.Info("announce - start - ", c) - if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { - log.Warningf("Unable to provide entry: %s, %s", c, err) - } - log.Info("announce - end - ", c) - } - } - }() - } +// Reprovider reannounces blocks to the network +type Reprovider interface { + // Run is used to begin processing the reprovider work and waiting for reprovide triggers + Run() + // Trigger a reprovide + Trigger(context.Context) error } diff --git a/provider/queue.go b/provider/queue/queue.go similarity index 98% rename from provider/queue.go rename to provider/queue/queue.go index 8fdfca81521..2afbc81ee9b 100644 --- a/provider/queue.go +++ b/provider/queue/queue.go @@ -1,4 +1,4 @@ -package provider +package queue import ( "context" @@ -10,8 +10,11 @@ import ( datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" query "github.com/ipfs/go-datastore/query" + logging "github.com/ipfs/go-log" ) +var log = logging.Logger("provider.queue") + // Queue provides a durable, FIFO interface to the datastore for storing cids // // Durability just means that cids in the process of being provided when a diff --git a/provider/queue_test.go b/provider/queue/queue_test.go similarity index 96% rename from provider/queue_test.go rename to provider/queue/queue_test.go index e151478d9f3..c8fb8682e31 100644 --- a/provider/queue_test.go +++ b/provider/queue/queue_test.go @@ -1,4 +1,4 @@ -package provider +package queue import ( "context" @@ -8,8 +8,11 @@ import ( cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" sync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-ipfs-blocksutil" ) +var blockGenerator = blocksutil.NewBlockGenerator() + func makeCids(n int) []cid.Cid { cids := make([]cid.Cid, 0, n) for i := 0; i < n; i++ { diff --git a/provider/simple/provider.go b/provider/simple/provider.go new file mode 100644 index 00000000000..8310cebb3a4 --- /dev/null +++ b/provider/simple/provider.go @@ -0,0 +1,72 @@ +// Package simple implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package simple + +import ( + "context" + + cid "github.com/ipfs/go-cid" + q "github.com/ipfs/go-ipfs/provider/queue" + logging "github.com/ipfs/go-log" + routing "github.com/libp2p/go-libp2p-routing" +) + +var logP = logging.Logger("provider.simple") + +const provideOutgoingWorkerLimit = 8 + +// Provider announces blocks to the network +type Provider struct { + ctx context.Context + // the CIDs for which provide announcements should be made + queue *q.Queue + // used to announce providing to the network + contentRouting routing.ContentRouting +} + +// NewProvider creates a provider that announces blocks to the network using a content router +func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting) *Provider { + return &Provider{ + ctx: ctx, + queue: queue, + contentRouting: contentRouting, + } +} + +// Close stops the provider +func (p *Provider) Close() error { + p.queue.Close() + return nil +} + +// Run workers to handle provide requests. +func (p *Provider) Run() { + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) error { + p.queue.Enqueue(root) + return nil +} + +// Handle all outgoing cids by providing (announcing) them +func (p *Provider) handleAnnouncements() { + for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { + go func() { + for p.ctx.Err() == nil { + select { + case <-p.ctx.Done(): + return + case c := <-p.queue.Dequeue(): + logP.Info("announce - start - ", c) + if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { + logP.Warningf("Unable to provide entry: %s, %s", c, err) + } + logP.Info("announce - end - ", c) + } + } + }() + } +} diff --git a/provider/provider_test.go b/provider/simple/provider_test.go similarity index 86% rename from provider/provider_test.go rename to provider/simple/provider_test.go index 7ef007b03a7..6f70a41d722 100644 --- a/provider/provider_test.go +++ b/provider/simple/provider_test.go @@ -1,4 +1,4 @@ -package provider +package simple_test import ( "context" @@ -11,6 +11,10 @@ import ( sync "github.com/ipfs/go-datastore/sync" blocksutil "github.com/ipfs/go-ipfs-blocksutil" pstore "github.com/libp2p/go-libp2p-peerstore" + + q "github.com/ipfs/go-ipfs/provider/queue" + + . "github.com/ipfs/go-ipfs/provider/simple" ) var blockGenerator = blocksutil.NewBlockGenerator() @@ -39,15 +43,15 @@ func TestAnnouncement(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) + queue, err := q.NewQueue(ctx, "test", ds) if err != nil { t.Fatal(err) } r := mockContentRouting() - provider := NewProvider(ctx, queue, r) - provider.Run() + prov := NewProvider(ctx, queue, r) + prov.Run() cids := cid.NewSet() @@ -58,7 +62,7 @@ func TestAnnouncement(t *testing.T) { go func() { for _, c := range cids.Keys() { - err = provider.Provide(c) + err = prov.Provide(c) // A little goroutine stirring to exercise some different states r := rand.Intn(10) time.Sleep(time.Microsecond * time.Duration(r)) diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go new file mode 100644 index 00000000000..73b733ce24a --- /dev/null +++ b/provider/simple/reprovide.go @@ -0,0 +1,225 @@ +package simple + +import ( + "context" + "fmt" + "time" + + backoff "github.com/cenkalti/backoff" + cid "github.com/ipfs/go-cid" + cidutil "github.com/ipfs/go-cidutil" + blocks "github.com/ipfs/go-ipfs-blockstore" + pin "github.com/ipfs/go-ipfs/pin" + ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log" + merkledag "github.com/ipfs/go-merkledag" + verifcid "github.com/ipfs/go-verifcid" + routing "github.com/libp2p/go-libp2p-routing" +) + +var logR = logging.Logger("reprovider.simple") + +//KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) +type doneFunc func(error) + +// Reprovider reannounces blocks to the network +type Reprovider struct { + ctx context.Context + trigger chan doneFunc + + // The routing system to provide values through + rsys routing.ContentRouting + + keyProvider KeyChanFunc + + tick time.Duration +} + +// NewReprovider creates new Reprovider instance. +func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { + return &Reprovider{ + ctx: ctx, + trigger: make(chan doneFunc), + + rsys: rsys, + keyProvider: keyProvider, + tick: reprovideIniterval, + } +} + +// Close the reprovider +func (rp *Reprovider) Close() error { + return nil +} + +// Run re-provides keys with 'tick' interval or when triggered +func (rp *Reprovider) Run() { + // dont reprovide immediately. + // may have just started the daemon and shutting it down immediately. + // probability( up another minute | uptime ) increases with uptime. + after := time.After(time.Minute) + var done doneFunc + for { + if rp.tick == 0 { + after = make(chan time.Time) + } + + select { + case <-rp.ctx.Done(): + return + case done = <-rp.trigger: + case <-after: + } + + //'mute' the trigger channel so when `ipfs bitswap reprovide` is called + //a 'reprovider is already running' error is returned + unmute := rp.muteTrigger() + + err := rp.Reprovide() + if err != nil { + logR.Debug(err) + } + + if done != nil { + done(err) + } + + unmute() + + after = time.After(rp.tick) + } +} + +// Reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) Reprovide() error { + keychan, err := rp.keyProvider(rp.ctx) + if err != nil { + return fmt.Errorf("failed to get key chan: %s", err) + } + for c := range keychan { + // hash security + if err := verifcid.ValidateCid(c); err != nil { + logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) + continue + } + op := func() error { + err := rp.rsys.Provide(rp.ctx, c, true) + if err != nil { + logR.Debugf("Failed to provide key: %s", err) + } + return err + } + + // TODO: this backoff library does not respect our context, we should + // eventually work contexts into it. low priority. + err := backoff.Retry(op, backoff.NewExponentialBackOff()) + if err != nil { + logR.Debugf("Providing failed after number of retries: %s", err) + return err + } + } + return nil +} + +// Trigger starts reprovision process in rp.Run and waits for it +func (rp *Reprovider) Trigger(ctx context.Context) error { + progressCtx, done := context.WithCancel(ctx) + + var err error + df := func(e error) { + err = e + done() + } + + select { + case <-rp.ctx.Done(): + return context.Canceled + case <-ctx.Done(): + return context.Canceled + case rp.trigger <- df: + <-progressCtx.Done() + return err + } +} + +func (rp *Reprovider) muteTrigger() context.CancelFunc { + ctx, cf := context.WithCancel(rp.ctx) + go func() { + defer cf() + for { + select { + case <-ctx.Done(): + return + case done := <-rp.trigger: + done(fmt.Errorf("reprovider is already running")) + } + } + }() + + return cf +} + +// Strategies + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(onlyRoots bool) func(pin.Pinner, ipld.DAGService) KeyChanFunc { + return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, dag, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } + } +} + +func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) + + for _, key := range pinning.DirectKeys() { + set.Visitor(ctx)(key) + } + + for _, key := range pinning.RecursiveKeys() { + set.Visitor(ctx)(key) + + if !onlyRoots { + err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx)) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil +} diff --git a/reprovide/reprovide_test.go b/provider/simple/reprovide_test.go similarity index 85% rename from reprovide/reprovide_test.go rename to provider/simple/reprovide_test.go index b9e9738b440..17c626c5b9e 100644 --- a/reprovide/reprovide_test.go +++ b/provider/simple/reprovide_test.go @@ -1,16 +1,19 @@ -package reprovide +package simple_test import ( "context" "testing" + "time" blocks "github.com/ipfs/go-block-format" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-blockstore" mock "github.com/ipfs/go-ipfs-routing/mock" pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/libp2p/go-testutil" + + . "github.com/ipfs/go-ipfs/provider/simple" ) func TestReprovide(t *testing.T) { @@ -34,8 +37,8 @@ func TestReprovide(t *testing.T) { } keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, 0, clA, keyProvider) - err = reprov.reprovide(ctx) + reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) + err = reprov.Reprovide() if err != nil { t.Fatal(err) } diff --git a/provider/system.go b/provider/system.go new file mode 100644 index 00000000000..6bc1d357cbc --- /dev/null +++ b/provider/system.go @@ -0,0 +1,47 @@ +package provider + +import ( + "context" + "github.com/ipfs/go-cid" +) + +// System defines the interface for interacting with the value +// provider system +type System interface { + Run() + Close() error + Provide(cid.Cid) error + Reprovide(context.Context) error +} + +type system struct { + provider Provider + reprovider Reprovider +} + +// NewSystem constructs a new provider system from a provider and reprovider +func NewSystem(provider Provider, reprovider Reprovider) System { + return &system{provider, reprovider} +} + +// Run the provider system by running the provider and reprovider +func (s *system) Run() { + go s.provider.Run() + go s.reprovider.Run() +} + +// Close the provider and reprovider +func (s *system) Close() error { + // TODO: Close reprovider here + return s.provider.Close() +} + +// Provide a value +func (s *system) Provide(cid cid.Cid) error { + return s.provider.Provide(cid) +} + +// Reprovide all the previously provided values +func (s *system) Reprovide(ctx context.Context) error { + return s.reprovider.Trigger(ctx) +} diff --git a/reprovide/providers.go b/reprovide/providers.go deleted file mode 100644 index bef56a0b720..00000000000 --- a/reprovide/providers.go +++ /dev/null @@ -1,75 +0,0 @@ -package reprovide - -import ( - "context" - - pin "github.com/ipfs/go-ipfs/pin" - - cid "github.com/ipfs/go-cid" - cidutil "github.com/ipfs/go-cidutil" - blocks "github.com/ipfs/go-ipfs-blockstore" - ipld "github.com/ipfs/go-ipld-format" - merkledag "github.com/ipfs/go-merkledag" -) - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool) func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { - return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, dag, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } - } -} - -func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - for _, key := range pinning.DirectKeys() { - set.Visitor(ctx)(key) - } - - for _, key := range pinning.RecursiveKeys() { - set.Visitor(ctx)(key) - - if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx)) - if err != nil { - log.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil -} diff --git a/reprovide/reprovide.go b/reprovide/reprovide.go deleted file mode 100644 index 1a6f5bad3f0..00000000000 --- a/reprovide/reprovide.go +++ /dev/null @@ -1,159 +0,0 @@ -package reprovide - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/cenkalti/backoff" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - "github.com/ipfs/go-verifcid" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" - routing "github.com/libp2p/go-libp2p-routing" -) - -var log = logging.Logger("reprovider") - -// KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) -type doneFunc func(error) - -type Reprovider struct { - ctx context.Context - trigger chan doneFunc - closing chan struct{} - - // The routing system to provide values through - rsys routing.ContentRouting - - keyProvider KeyChanFunc - tick time.Duration -} - -// NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { - return &Reprovider{ - ctx: ctx, - trigger: make(chan doneFunc), - closing: make(chan struct{}), - - rsys: rsys, - keyProvider: keyProvider, - tick: tick, - } -} - -// Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run(proc goprocess.Process) { - ctx := goprocessctx.WithProcessClosing(rp.ctx, proc) - defer close(rp.closing) - - // dont reprovide immediately. - // may have just started the daemon and shutting it down immediately. - // probability( up another minute | uptime ) increases with uptime. - after := time.After(time.Minute) - var done doneFunc - for { - if rp.tick == 0 { - after = make(chan time.Time) - } - - select { - case <-ctx.Done(): - return - case done = <-rp.trigger: - case <-after: - } - - // 'mute' the trigger channel so when `ipfs bitswap reprovide` is called - // a 'reprovider is already running' error is returned - unmute := rp.muteTrigger() - - err := rp.reprovide(ctx) - if err != nil { - log.Debug(err) - } - - if done != nil { - done(err) - } - - unmute() - - after = time.After(rp.tick) - } -} - -// reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) reprovide(ctx context.Context) error { - keychan, err := rp.keyProvider(ctx) - if err != nil { - return fmt.Errorf("failed to get key chan: %s", err) - } - for c := range keychan { - // hash security - if err := verifcid.ValidateCid(c); err != nil { - log.Errorf("insecure hash in reprovider, %s (%s)", c, err) - continue - } - op := func() error { - err := rp.rsys.Provide(ctx, c, true) - if err != nil { - log.Debugf("Failed to provide key: %s", err) - } - return err - } - - // TODO: this backoff library does not respect our context, we should - // eventually work contexts into it. low priority. - err := backoff.Retry(op, backoff.NewExponentialBackOff()) - if err != nil { - log.Debugf("Providing failed after number of retries: %s", err) - return err - } - } - return nil -} - -// Trigger starts reprovision process in rp.Run and waits for it -func (rp *Reprovider) Trigger(ctx context.Context) error { - progressCtx, done := context.WithCancel(ctx) - - var err error - df := func(e error) { - err = e - done() - } - - select { - case <-rp.closing: - return errors.New("reprovider is closed") - case <-rp.ctx.Done(): - return rp.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - case rp.trigger <- df: - <-progressCtx.Done() - return err - } -} - -func (rp *Reprovider) muteTrigger() context.CancelFunc { - ctx, cf := context.WithCancel(rp.ctx) - go func() { - defer cf() - for { - select { - case <-ctx.Done(): - return - case done := <-rp.trigger: - done(fmt.Errorf("reprovider is already running")) - } - } - }() - - return cf -} diff --git a/test/sharness/lib/iptb-lib.sh b/test/sharness/lib/iptb-lib.sh index 6ee8fbd8b01..20a2ce7a68a 100644 --- a/test/sharness/lib/iptb-lib.sh +++ b/test/sharness/lib/iptb-lib.sh @@ -62,3 +62,24 @@ iptb_wait_stop() { go-sleep 10ms done } + +findprovs_empty() { + test_expect_success 'findprovs '$1' succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut + ' + + test_expect_success "findprovs $1 output is empty" ' + test_must_be_empty findprovsOut + ' +} + +findprovs_expect() { + test_expect_success 'findprovs '$1' succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && + echo '$2' > expected + ' + + test_expect_success "findprovs $1 output looks good" ' + test_cmp findprovsOut expected + ' +} diff --git a/test/sharness/t0175-provider.sh b/test/sharness/t0175-provider.sh new file mode 100755 index 00000000000..364df60e91e --- /dev/null +++ b/test/sharness/t0175-provider.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash + +test_description="Test reprovider" + +. lib/test-lib.sh + +NUM_NODES=2 + +test_expect_success 'init iptb' ' + iptb testbed create -type localipfs -force -count $NUM_NODES -init +' + +test_expect_success 'peer ids' ' + PEERID_0=$(iptb attr get 0 id) && + PEERID_1=$(iptb attr get 1 id) +' + +test_expect_success 'use strategic providing' ' + iptb run -- ipfs config --json Experimental.StrategicProviding false +' + +startup_cluster ${NUM_NODES} + +test_expect_success 'add test object' ' + HASH_0=$(echo "foo" | ipfsi 0 add -q) +' + +findprovs_expect '$HASH_0' '$PEERID_0' + +test_expect_success 'stop node 1' ' + iptb stop +' + +test_done diff --git a/test/sharness/t0175-reprovider.sh b/test/sharness/t0175-reprovider.sh index 2df63fdfc0a..c00b25c0c3a 100755 --- a/test/sharness/t0175-reprovider.sh +++ b/test/sharness/t0175-reprovider.sh @@ -23,27 +23,6 @@ init_strategy() { startup_cluster ${NUM_NODES} } -findprovs_empty() { - test_expect_success 'findprovs '$1' succeeds' ' - ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut - ' - - test_expect_success "findprovs $1 output is empty" ' - test_must_be_empty findprovsOut - ' -} - -findprovs_expect() { - test_expect_success 'findprovs '$1' succeeds' ' - ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && - echo '$2' > expected - ' - - test_expect_success "findprovs $1 output looks good" ' - test_cmp findprovsOut expected - ' -} - reprovide() { test_expect_success 'reprovide' ' # TODO: this hangs, though only after reprovision was done diff --git a/test/sharness/t0175-strategic-provider.sh b/test/sharness/t0175-strategic-provider.sh new file mode 100755 index 00000000000..7bad38838b6 --- /dev/null +++ b/test/sharness/t0175-strategic-provider.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash + +test_description="Test reprovider" + +. lib/test-lib.sh + +NUM_NODES=2 + +test_expect_success 'init iptb' ' + iptb testbed create -type localipfs -force -count $NUM_NODES -init +' + +test_expect_success 'peer ids' ' + PEERID_0=$(iptb attr get 0 id) && + PEERID_1=$(iptb attr get 1 id) +' + +test_expect_success 'use strategic providing' ' + iptb run -- ipfs config --json Experimental.StrategicProviding true +' + +startup_cluster ${NUM_NODES} + +test_expect_success 'add test object' ' + HASH_0=$(echo "foo" | ipfsi 0 add -q) +' + +findprovs_empty '$HASH_0' + +test_expect_success 'stop node 1' ' + iptb stop +' + +test_done