diff --git a/CHANGELOG.md b/CHANGELOG.md index 056791e0b..efc2c6c12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,15 +19,6 @@ The following emojis are used to highlight certain changes: - Updated, higher-definition icons in directory listings. - Customizable menu items next to "About IPFS" and "Install IPFS". - Valid DAG-CBOR and DAG-JSON blocks now provide a preview, where links can be followed. -- 🛠 Provider API refactor - - `provider/queue` has been moved to `provider/internal/queue`. - - `provider/batched.New` has been moved to `provider.New` and arguments has been changed: - - a routing system is now passed with the `provider.Online` option, by default the system run in offline mode (push stuff onto the queue); and - - you do not have to pass a queue anymore, you pass a `datastore.Datastore` exclusively. - - `provider/simple` has been removed, now instead `provider.New` will accept non batched routing systems and use type assertion for the `ProvideMany` call, giving a single implementation. - - `provider.NewOfflineProvider` has been renamed to `provider.NewNoopProvider` to show more clearly that is does nothing. - - `provider.NewSystem` has been removed, `provider.New` now returns a `provider.System` directly. - - `provider.Provider` and `provider.Reprovider` has been merged under one `provider.System` ## [0.8.0] - 2023-04-05 ### Added diff --git a/go.mod b/go.mod index 2d08290fc..5be828081 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/benbjohnson/clock v1.3.0 + github.com/cenkalti/backoff v2.2.1+incompatible github.com/cespare/xxhash/v2 v2.2.0 github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 github.com/cskr/pubsub v1.0.2 diff --git a/go.sum b/go.sum index 7b4a7f276..86980317b 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/provider/README.md b/provider/README.md new file mode 100644 index 000000000..0e4f4650d --- /dev/null +++ b/provider/README.md @@ -0,0 +1,30 @@ +## Usage + +Here's how you create, start, interact with, and stop the provider system: + +```golang +import ( + "context" + "time" + + "github.com/ipfs/boxo/provider" + "github.com/ipfs/boxo/provider/queue" + "github.com/ipfs/boxo/provider/simple" +) + +rsys := (your routing system here) +dstore := (your datastore here) +cid := (your cid to provide here) + +q := queue.NewQueue(context.Background(), "example", dstore) + +reprov := simple.NewReprovider(context.Background(), time.Hour * 12, rsys, simple.NewBlockstoreProvider(dstore)) +prov := simple.NewProvider(context.Background(), q, rsys) +sys := provider.NewSystem(prov, reprov) + +sys.Run() + +sys.Provide(cid) + +sys.Close() +``` diff --git a/provider/reprovider.go b/provider/batched/system.go similarity index 52% rename from provider/reprovider.go rename to provider/batched/system.go index 2d79a62d8..e3cb0325a 100644 --- a/provider/reprovider.go +++ b/provider/batched/system.go @@ -1,44 +1,26 @@ -package provider +package batched import ( "context" "errors" "fmt" - "math" "strconv" "sync" "time" - "github.com/ipfs/boxo/provider/internal/queue" + provider "github.com/ipfs/boxo/provider" + "github.com/ipfs/boxo/provider/queue" + "github.com/ipfs/boxo/provider/simple" "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - namespace "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log" "github.com/multiformats/go-multihash" ) -const ( - // MAGIC: how long we wait before reproviding a key - DefaultReproviderInterval = time.Hour * 22 // https://github.com/ipfs/kubo/pull/9326 - - // MAGIC: If the reprovide ticker is larger than a minute (likely), provide - // once after we've been up a minute. Don't provide _immediately_ as we - // might be just about to stop. - defaultInitialReprovideDelay = time.Minute - - // MAGIC: how long we wait between the first provider we hear about and - // batching up the provides to send out - pauseDetectionThreshold = time.Millisecond * 500 - - // MAGIC: how long we are willing to collect providers for the batch after - // we receive the first one - maxCollectionDuration = time.Minute * 10 -) - var log = logging.Logger("provider.batched") -type reprovider struct { +type BatchProvidingSystem struct { ctx context.Context close context.CancelFunc closewg sync.WaitGroup @@ -47,64 +29,39 @@ type reprovider struct { initalReprovideDelay time.Duration initialReprovideDelaySet bool - rsys Provide - keyProvider KeyChanFunc + rsys provideMany + keyProvider simple.KeyChanFunc q *queue.Queue ds datastore.Batching reprovideCh chan cid.Cid - maxReprovideBatchSize uint - - statLk sync.Mutex - totalProvides, lastReprovideBatchSize uint64 + totalProvides, lastReprovideBatchSize int avgProvideDuration, lastReprovideDuration time.Duration - - throughputCallback ThroughputCallback - // throughputProvideCurrentCount counts how many provides has been done since the last call to throughputCallback - throughputProvideCurrentCount uint - // throughputDurationSum sums up durations between two calls to the throughputCallback - throughputDurationSum time.Duration - throughputMinimumProvides uint - - keyPrefix datastore.Key } -var _ System = (*reprovider)(nil) +var _ provider.System = (*BatchProvidingSystem)(nil) -type Provide interface { - Provide(context.Context, cid.Cid, bool) error -} - -type ProvideMany interface { +type provideMany interface { ProvideMany(ctx context.Context, keys []multihash.Multihash) error -} - -type Ready interface { Ready() bool } // Option defines the functional option type that can be used to configure // BatchProvidingSystem instances -type Option func(system *reprovider) error - -var lastReprovideKey = datastore.NewKey("/reprovide/lastreprovide") -var DefaultKeyPrefix = datastore.NewKey("/provider") - -// New creates a new [System]. By default it is offline, that means it will -// enqueue tasks in ds. -// To have it publish records in the network use the [Online] option. -// If provider casts to [ProvideMany] the [ProvideMany.ProvideMany] method will -// be called instead. -// -// If provider casts to [Ready], it will wait until [Ready.Ready] is true. -func New(ds datastore.Batching, opts ...Option) (System, error) { - s := &reprovider{ - reprovideInterval: DefaultReproviderInterval, - maxReprovideBatchSize: math.MaxUint, - keyPrefix: DefaultKeyPrefix, - reprovideCh: make(chan cid.Cid), +type Option func(system *BatchProvidingSystem) error + +var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") + +func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { + s := &BatchProvidingSystem{ + reprovideInterval: time.Hour * 24, + rsys: provider, + keyProvider: nil, + q: q, + ds: datastore.NewMapDatastore(), + reprovideCh: make(chan cid.Cid), } for _, o := range opts { @@ -114,8 +71,13 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } // Setup default behavior for the initial reprovide delay - if !s.initialReprovideDelaySet && s.reprovideInterval > defaultInitialReprovideDelay { - s.initalReprovideDelay = defaultInitialReprovideDelay + // + // If the reprovide ticker is larger than a minute (likely), + // provide once after we've been up a minute. + // + // Don't provide _immediately_ as we might be just about to stop. + if !s.initialReprovideDelaySet && s.reprovideInterval > time.Minute { + s.initalReprovideDelay = time.Minute s.initialReprovideDelaySet = true } @@ -127,87 +89,50 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } } - s.ds = namespace.Wrap(ds, s.keyPrefix) - s.q = queue.NewQueue(s.ds) - // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options ctx, cancel := context.WithCancel(context.Background()) s.ctx = ctx s.close = cancel - if s.rsys != nil { - if _, ok := s.rsys.(ProvideMany); !ok { - s.maxReprovideBatchSize = 1 - } - - s.run() - } - return s, nil } -func ReproviderInterval(duration time.Duration) Option { - return func(system *reprovider) error { - system.reprovideInterval = duration - return nil - } -} - -func KeyProvider(fn KeyChanFunc) Option { - return func(system *reprovider) error { - system.keyProvider = fn - return nil - } -} - -// DatastorePrefix sets a prefix for internal state stored in the Datastore. -// Defaults to [DefaultKeyPrefix]. -func DatastorePrefix(k datastore.Key) Option { - return func(system *reprovider) error { - system.keyPrefix = k +func Datastore(batching datastore.Batching) Option { + return func(system *BatchProvidingSystem) error { + system.ds = batching return nil } } -// ThroughputReport will fire the callback synchronously once at least limit -// multihashes have been advertised, it will then wait until a new set of at least -// limit multihashes has been advertised. -// While ThroughputReport is set batches will be at most minimumProvides big. -// If it returns false it wont ever be called again. -func ThroughputReport(f ThroughputCallback, minimumProvides uint) Option { - return func(system *reprovider) error { - system.throughputCallback = f - system.throughputMinimumProvides = minimumProvides +func ReproviderInterval(duration time.Duration) Option { + return func(system *BatchProvidingSystem) error { + system.reprovideInterval = duration return nil } } -type ThroughputCallback = func(reprovide bool, complete bool, totalKeysProvided uint, totalDuration time.Duration) (continueWatching bool) - -// Online will enable the router and make it send publishes online. -// nil can be used to turn the router offline. -// You can't register multiple providers, if this option is passed multiple times -// it will error. -func Online(rsys Provide) Option { - return func(system *reprovider) error { - if system.rsys != nil { - return fmt.Errorf("trying to register two provider on the same reprovider") - } - system.rsys = rsys +func KeyProvider(fn simple.KeyChanFunc) Option { + return func(system *BatchProvidingSystem) error { + system.keyProvider = fn return nil } } func initialReprovideDelay(duration time.Duration) Option { - return func(system *reprovider) error { + return func(system *BatchProvidingSystem) error { system.initialReprovideDelaySet = true system.initalReprovideDelay = duration return nil } } -func (s *reprovider) run() { +func (s *BatchProvidingSystem) Run() { + // how long we wait between the first provider we hear about and batching up the provides to send out + const pauseDetectionThreshold = time.Millisecond * 500 + // how long we are willing to collect providers for the batch after we receive the first one + const maxCollectionDuration = time.Minute * 10 + provCh := s.q.Dequeue() s.closewg.Add(1) @@ -241,16 +166,27 @@ func (s *reprovider) run() { for { performedReprovide := false - complete := false - - batchSize := s.maxReprovideBatchSize - if s.throughputCallback != nil && s.throughputMinimumProvides < batchSize { - batchSize = s.throughputMinimumProvides - } // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be // stopped and have empty channels - for uint(len(m)) < batchSize { + loop: + for { + select { + case <-maxCollectionDurationTimer.C: + // if this timer has fired then the pause timer has started so let's stop and empty it + stopAndEmptyTimer(pauseDetectTimer) + break loop + default: + } + + select { + case c := <-provCh: + resetTimersAfterReceivingProvide() + m[c] = struct{}{} + continue + default: + } + select { case c := <-provCh: resetTimersAfterReceivingProvide() @@ -262,19 +198,15 @@ func (s *reprovider) run() { case <-pauseDetectTimer.C: // if this timer has fired then the max collection timer has started so let's stop and empty it stopAndEmptyTimer(maxCollectionDurationTimer) - complete = true - goto AfterLoop + break loop case <-maxCollectionDurationTimer.C: // if this timer has fired then the pause timer has started so let's stop and empty it stopAndEmptyTimer(pauseDetectTimer) - goto AfterLoop + break loop case <-s.ctx.Done(): return } } - stopAndEmptyTimer(pauseDetectTimer) - stopAndEmptyTimer(maxCollectionDurationTimer) - AfterLoop: if len(m) == 0 { continue @@ -298,63 +230,41 @@ func (s *reprovider) run() { continue } - if r, ok := s.rsys.(Ready); ok { - ticker := time.NewTicker(time.Minute) - for !r.Ready() { - log.Debugf("reprovider system not ready") - select { - case <-ticker.C: - case <-s.ctx.Done(): - return - } + for !s.rsys.Ready() { + log.Debugf("reprovider system not ready") + select { + case <-time.After(time.Minute): + case <-s.ctx.Done(): + return } - ticker.Stop() } log.Debugf("starting provide of %d keys", len(keys)) start := time.Now() - err := doProvideMany(s.ctx, s.rsys, keys) + err := s.rsys.ProvideMany(s.ctx, keys) if err != nil { log.Debugf("providing failed %v", err) continue } dur := time.Since(start) - totalProvideTime := time.Duration(s.totalProvides) * s.avgProvideDuration - recentAvgProvideDuration := dur / time.Duration(len(keys)) - - s.statLk.Lock() - s.avgProvideDuration = time.Duration((totalProvideTime + dur) / (time.Duration(s.totalProvides) + time.Duration(len(keys)))) - s.totalProvides += uint64(len(keys)) + totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) + recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys))) + s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) + s.totalProvides += len(keys) log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) if performedReprovide { - s.lastReprovideBatchSize = uint64(len(keys)) + s.lastReprovideBatchSize = len(keys) s.lastReprovideDuration = dur - s.statLk.Unlock() - - // Don't hold the lock while writing to disk, consumers don't need to wait on IO to read thoses fields. - if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil { log.Errorf("could not store last reprovide time: %v", err) } if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil { log.Errorf("could not perform sync of last reprovide time: %v", err) } - } else { - s.statLk.Unlock() - } - - s.throughputDurationSum += dur - s.throughputProvideCurrentCount += uint(len(keys)) - if s.throughputCallback != nil && s.throughputProvideCurrentCount >= s.throughputMinimumProvides { - if more := s.throughputCallback(performedReprovide, complete, s.throughputProvideCurrentCount, s.throughputDurationSum); !more { - s.throughputCallback = nil - } - s.throughputProvideCurrentCount = 0 - s.throughputDurationSum = 0 } } }() @@ -417,22 +327,22 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } -func (s *reprovider) Close() error { +func (s *BatchProvidingSystem) Close() error { s.close() err := s.q.Close() s.closewg.Wait() return err } -func (s *reprovider) Provide(cid cid.Cid) error { +func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { return s.q.Enqueue(cid) } -func (s *reprovider) Reprovide(ctx context.Context) error { +func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { return s.reprovide(ctx, true) } -func (s *reprovider) reprovide(ctx context.Context, force bool) error { +func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { if !s.shouldReprovide() && !force { return nil } @@ -463,7 +373,7 @@ reprovideCidLoop: return nil } -func (s *reprovider) getLastReprovideTime() (time.Time, error) { +func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { val, err := s.ds.Get(s.ctx, lastReprovideKey) if errors.Is(err, datastore.ErrNotFound) { return time.Time{}, nil @@ -480,45 +390,31 @@ func (s *reprovider) getLastReprovideTime() (time.Time, error) { return t, nil } -func (s *reprovider) shouldReprovide() bool { +func (s *BatchProvidingSystem) shouldReprovide() bool { t, err := s.getLastReprovideTime() if err != nil { log.Debugf("getting last reprovide time failed: %s", err) return false } - if time.Since(t) < s.reprovideInterval { + if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) { return false } return true } -type ReproviderStats struct { - TotalProvides, LastReprovideBatchSize uint64 +type BatchedProviderStats struct { + TotalProvides, LastReprovideBatchSize int AvgProvideDuration, LastReprovideDuration time.Duration } // Stat returns various stats about this provider system -func (s *reprovider) Stat() (ReproviderStats, error) { - s.statLk.Lock() - defer s.statLk.Unlock() - return ReproviderStats{ +func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) { + // TODO: Does it matter that there is no locking around the total+average values? + return BatchedProviderStats{ TotalProvides: s.totalProvides, LastReprovideBatchSize: s.lastReprovideBatchSize, AvgProvideDuration: s.avgProvideDuration, LastReprovideDuration: s.lastReprovideDuration, }, nil } - -func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) error { - if many, ok := r.(ProvideMany); ok { - return many.ProvideMany(ctx, keys) - } - - for _, k := range keys { - if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil { - return err - } - } - return nil -} diff --git a/provider/batched/system_test.go b/provider/batched/system_test.go new file mode 100644 index 000000000..c8a7d7b84 --- /dev/null +++ b/provider/batched/system_test.go @@ -0,0 +1,119 @@ +package batched + +import ( + "context" + "strconv" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + + "github.com/ipfs/boxo/internal/test" + q "github.com/ipfs/boxo/provider/queue" +) + +type mockProvideMany struct { + lk sync.Mutex + keys []mh.Multihash +} + +func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = keys + return nil +} + +func (m *mockProvideMany) Ready() bool { + return true +} + +func (m *mockProvideMany) GetKeys() []mh.Multihash { + m.lk.Lock() + defer m.lk.Unlock() + return m.keys[:] +} + +var _ provideMany = (*mockProvideMany)(nil) + +func TestBatched(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + provider := &mockProvideMany{} + + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + const numProvides = 100 + keysToProvide := make(map[cid.Cid]int) + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, h) + keysToProvide[c] = i + } + + batchSystem, err := New(provider, queue, KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + for k := range keysToProvide { + select { + case ch <- k: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + }), initialReprovideDelay(0)) + if err != nil { + t.Fatal(err) + } + + batchSystem.Run() + + var keys []mh.Multihash + for { + if ctx.Err() != nil { + t.Fatal("test hung") + } + keys = provider.GetKeys() + if len(keys) != 0 { + break + } + time.Sleep(time.Millisecond * 100) + } + + if len(keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) + } + + provMap := make(map[string]struct{}) + for _, k := range keys { + provMap[string(k)] = struct{}{} + } + + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + if _, found := provMap[string(h)]; !found { + t.Fatalf("could not find provider with value %d", i) + } + } +} diff --git a/provider/noop.go b/provider/noop.go deleted file mode 100644 index 5367ccb30..000000000 --- a/provider/noop.go +++ /dev/null @@ -1,32 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -type noopProvider struct{} - -var _ System = (*noopProvider)(nil) - -// NewNoopProvider creates a ProviderSystem that does nothing. -func NewNoopProvider() System { - return &noopProvider{} -} - -func (op *noopProvider) Close() error { - return nil -} - -func (op *noopProvider) Provide(cid.Cid) error { - return nil -} - -func (op *noopProvider) Reprovide(context.Context) error { - return nil -} - -func (op *noopProvider) Stat() (ReproviderStats, error) { - return ReproviderStats{}, nil -} diff --git a/provider/offline.go b/provider/offline.go new file mode 100644 index 000000000..030a70ab1 --- /dev/null +++ b/provider/offline.go @@ -0,0 +1,29 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +type offlineProvider struct{} + +// NewOfflineProvider creates a ProviderSystem that does nothing +func NewOfflineProvider() System { + return &offlineProvider{} +} + +func (op *offlineProvider) Run() { +} + +func (op *offlineProvider) Close() error { + return nil +} + +func (op *offlineProvider) Provide(cid.Cid) error { + return nil +} + +func (op *offlineProvider) Reprovide(context.Context) error { + return nil +} diff --git a/provider/provider.go b/provider/provider.go index 8b85fe62f..3b9c6ba3e 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -3,115 +3,25 @@ package provider import ( "context" - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" - logging "github.com/ipfs/go-log" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) -var logR = logging.Logger("reprovider.simple") - // Provider announces blocks to the network type Provider interface { + // Run is used to begin processing the provider work + Run() // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error + // Close stops the provider + Close() error } // Reprovider reannounces blocks to the network type Reprovider interface { - // Reprovide starts a new reprovide if one isn't running already. - Reprovide(context.Context) error -} - -// System defines the interface for interacting with the value -// provider system -type System interface { + // Run is used to begin processing the reprovider work and waiting for reprovide triggers + Run() + // Trigger a reprovide + Trigger(context.Context) error + // Close stops the reprovider Close() error - Stat() (ReproviderStats, error) - Provider - Reprovider -} - -// KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } -} - -func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { - set.Visitor(ctx)(key) - } - - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - - session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) - if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { - clink, ok := res.LastBlockLink.(cidlink.Link) - if ok { - set.Visitor(ctx)(clink.Cid) - } - return nil - }) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil } diff --git a/provider/internal/queue/queue.go b/provider/queue/queue.go similarity index 86% rename from provider/internal/queue/queue.go rename to provider/queue/queue.go index dada92788..618256bbe 100644 --- a/provider/internal/queue/queue.go +++ b/provider/queue/queue.go @@ -3,7 +3,6 @@ package queue import ( "context" "fmt" - cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -21,6 +20,7 @@ var log = logging.Logger("provider.queue") type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider + name string ctx context.Context ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid @@ -32,10 +32,11 @@ type Queue struct { } // NewQueue creates a queue for cids -func NewQueue(ds datastore.Datastore) *Queue { - namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) - cancelCtx, cancel := context.WithCancel(context.Background()) +func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) + cancelCtx, cancel := context.WithCancel(ctx) q := &Queue{ + name: name, ctx: cancelCtx, ds: namespaced, dequeue: make(chan cid.Cid), @@ -44,16 +45,13 @@ func NewQueue(ds datastore.Datastore) *Queue { closed: make(chan struct{}, 1), } q.work() - return q + return q, nil } // Close stops the queue func (q *Queue) Close() error { q.close() <-q.closed - // We don't close dequeue because the provider which consume this get caught in - // an infinite loop dequeing cid.Undef if we do that. - // The provider has it's own select on top of dequeue and will handle this by itself. return nil } @@ -81,6 +79,8 @@ func (q *Queue) work() { defer func() { // also cancels any in-progess enqueue tasks. q.close() + // unblocks anyone waiting + close(q.dequeue) // unblocks the close call close(q.closed) }() @@ -121,12 +121,6 @@ func (q *Queue) work() { q.counter++ nextKey := datastore.NewKey(keyPath) - if c == cid.Undef { - // fast path, skip rereading the datastore if we don't have anything in hand yet - c = toQueue - k = nextKey - } - if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue diff --git a/provider/internal/queue/queue_test.go b/provider/queue/queue_test.go similarity index 80% rename from provider/internal/queue/queue_test.go rename to provider/queue/queue_test.go index a2d9f0be4..9eacf4349 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/queue/queue_test.go @@ -43,8 +43,10 @@ func TestBasicOperation(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) - defer queue.Close() + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(10) @@ -61,8 +63,10 @@ func TestMangledData(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) - defer queue.Close() + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(10) for _, c := range cids { @@ -71,7 +75,7 @@ func TestMangledData(t *testing.T) { // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err := queue.ds.Put(ctx, queueKey, []byte("borked")) + err = queue.ds.Put(ctx, queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -87,8 +91,10 @@ func TestInitialization(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) - defer queue.Close() + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(10) for _, c := range cids { @@ -98,8 +104,10 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[:5], queue, t) // make a new queue, same data - queue = NewQueue(ds) - defer queue.Close() + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } assertOrdered(cids[5:], queue, t) } @@ -110,18 +118,21 @@ func TestInitializationWithManyCids(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(25) for _, c := range cids { queue.Enqueue(c) } - queue.Close() - // make a new queue, same data - queue = NewQueue(ds) - defer queue.Close() + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } assertOrdered(cids, queue, t) } diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go deleted file mode 100644 index bfb8fc187..000000000 --- a/provider/reprovider_test.go +++ /dev/null @@ -1,219 +0,0 @@ -package provider - -import ( - "bytes" - "context" - "runtime" - "strconv" - "sync" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - mh "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/assert" -) - -type allFeatures interface { - Provide - ProvideMany - Ready -} - -type mockProvideMany struct { - delay time.Duration - lk sync.Mutex - keys []mh.Multihash - calls uint -} - -func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = append(m.keys, keys...) - m.calls++ - time.Sleep(time.Duration(len(keys)) * m.delay) - return nil -} - -func (m *mockProvideMany) Provide(ctx context.Context, key cid.Cid, _ bool) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = append(m.keys, key.Hash()) - m.calls++ - time.Sleep(m.delay) - return nil -} - -func (m *mockProvideMany) Ready() bool { - return true -} - -func (m *mockProvideMany) GetKeys() (keys []mh.Multihash, calls uint) { - m.lk.Lock() - defer m.lk.Unlock() - return append([]mh.Multihash(nil), m.keys...), m.calls -} - -var _ allFeatures = (*mockProvideMany)(nil) - -type allButMany interface { - Provide - Ready -} - -type singleMockWrapper struct { - allButMany -} - -func TestReprovider(t *testing.T) { - t.Parallel() - t.Run("many", func(t *testing.T) { - t.Parallel() - testProvider(t, false) - }) - t.Run("single", func(t *testing.T) { - t.Parallel() - testProvider(t, true) - }) -} - -func testProvider(t *testing.T, singleProvide bool) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - - // It has to be so big because the combo of noisy CI runners + OSes that don't - // have scheduler as good as linux's one add a whole lot of jitter. - const provideDelay = time.Millisecond * 5 - orig := &mockProvideMany{ - delay: provideDelay, - } - var provider Provide = orig - if singleProvide { - provider = singleMockWrapper{orig} - } - - const numProvides = 100 - keysToProvide := make([]cid.Cid, numProvides) - for i := range keysToProvide { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - c := cid.NewCidV1(cid.Raw, h) - keysToProvide[i] = c - } - - var keyWait sync.Mutex - keyWait.Lock() - batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - go func() { - defer keyWait.Unlock() - for _, k := range keysToProvide { - select { - case ch <- k: - case <-ctx.Done(): - return - } - } - }() - return ch, nil - }), - initialReprovideDelay(0), - ThroughputReport(func(_, complete bool, n uint, d time.Duration) bool { - if !singleProvide && complete { - t.Errorf("expected an incomplete report but got a complete one") - } - - const twentyFivePercent = provideDelay / 4 - const seventyFivePercent = provideDelay - twentyFivePercent - const hundredTwentyFivePercent = provideDelay + twentyFivePercent - - avg := d / time.Duration(n) - - // windows's and darwin's schedulers and timers are too unreliable for this check - if runtime.GOOS != "windows" && runtime.GOOS != "darwin" && !(seventyFivePercent <= avg && avg <= hundredTwentyFivePercent) { - t.Errorf("average computed duration is not within bounds, expected between %v and %v but got %v.", seventyFivePercent, hundredTwentyFivePercent, avg) - } - return false - }, numProvides/2), - ) - if err != nil { - t.Fatal(err) - } - defer batchSystem.Close() - - keyWait.Lock() - time.Sleep(pauseDetectionThreshold + time.Millisecond*50) // give it time to call provider after that - - keys, calls := orig.GetKeys() - if len(keys) != numProvides { - t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) - } - if singleProvide { - if calls != 100 { - t.Fatalf("expected 100 call single provide call, got %d", calls) - } - } else { - // Two because of ThroughputReport's limit being half. - if calls != 2 { - t.Fatalf("expected 2 call batched provide call, got %d", calls) - } - } - - provMap := make(map[string]struct{}) - for _, k := range keys { - provMap[string(k)] = struct{}{} - } - - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - if _, found := provMap[string(h)]; !found { - t.Fatalf("could not find provider with value %d", i) - } - } -} - -func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { - // Don't run in Parallel as this test is time sensitive. - - someHash, err := mh.Sum([]byte("Vires in Numeris!"), mh.BLAKE3, -1) - assert.NoError(t, err) - c := cid.NewCidV1(cid.Raw, someHash) - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - - // First public using an offline system to enqueue in the datastore. - sys, err := New(ds) - assert.NoError(t, err) - - err = sys.Provide(c) - assert.NoError(t, err) - - err = sys.Close() - assert.NoError(t, err) - - // Secondly restart an online datastore and we want to see this previously provided cid published. - prov := &mockProvideMany{} - sys, err = New(ds, Online(prov), initialReprovideDelay(0)) - assert.NoError(t, err) - - time.Sleep(pauseDetectionThreshold + time.Millisecond*10) // give it time to call provider after that - - err = sys.Close() - assert.NoError(t, err) - - prov.lk.Lock() - defer prov.lk.Unlock() - if len(prov.keys) != 1 { - t.Fatalf("expected to see 1 provide; got %d", len(prov.keys)) - } - if !bytes.Equal(prov.keys[0], someHash) { - t.Fatalf("keys are not equal expected %v, got %v", someHash, prov.keys[0]) - } -} diff --git a/provider/simple/provider.go b/provider/simple/provider.go new file mode 100644 index 000000000..63de031ad --- /dev/null +++ b/provider/simple/provider.go @@ -0,0 +1,116 @@ +// Package simple implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package simple + +import ( + "context" + "time" + + q "github.com/ipfs/boxo/provider/queue" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p/core/routing" +) + +var logP = logging.Logger("provider.simple") + +// Provider announces blocks to the network +type Provider struct { + ctx context.Context + // the CIDs for which provide announcements should be made + queue *q.Queue + // used to announce providing to the network + contentRouting routing.ContentRouting + // how long to wait for announce to complete before giving up + timeout time.Duration + // how many workers concurrently work through thhe queue + workerLimit int +} + +// Option defines the functional option type that can be used to configure +// provider instances +type Option func(*Provider) + +// WithTimeout is an option to set a timeout on a provider +func WithTimeout(timeout time.Duration) Option { + return func(p *Provider) { + p.timeout = timeout + } +} + +// MaxWorkers is an option to set the max workers on a provider +func MaxWorkers(count int) Option { + return func(p *Provider) { + p.workerLimit = count + } +} + +// NewProvider creates a provider that announces blocks to the network using a content router +func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { + p := &Provider{ + ctx: ctx, + queue: queue, + contentRouting: contentRouting, + workerLimit: 8, + } + + for _, option := range options { + option(p) + } + + return p +} + +// Close stops the provider +func (p *Provider) Close() error { + return p.queue.Close() +} + +// Run workers to handle provide requests. +func (p *Provider) Run() { + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) error { + return p.queue.Enqueue(root) +} + +// Handle all outgoing cids by providing (announcing) them +func (p *Provider) handleAnnouncements() { + for workers := 0; workers < p.workerLimit; workers++ { + go func() { + for p.ctx.Err() == nil { + select { + case <-p.ctx.Done(): + return + case c, ok := <-p.queue.Dequeue(): + if !ok { + // queue closed. + return + } + + p.doProvide(c) + } + } + }() + } +} + +func (p *Provider) doProvide(c cid.Cid) { + ctx := p.ctx + if p.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, p.timeout) + defer cancel() + } else { + ctx = p.ctx + } + + logP.Info("announce - start - ", c) + if err := p.contentRouting.Provide(ctx, c, true); err != nil { + logP.Warnf("Unable to provide entry: %s, %s", c, err) + } + logP.Info("announce - end - ", c) +} diff --git a/provider/simple/provider_test.go b/provider/simple/provider_test.go new file mode 100644 index 000000000..8734c8ff6 --- /dev/null +++ b/provider/simple/provider_test.go @@ -0,0 +1,166 @@ +package simple_test + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/ipfs/boxo/internal/test" + q "github.com/ipfs/boxo/provider/queue" + + . "github.com/ipfs/boxo/provider/simple" +) + +var blockGenerator = blocksutil.NewBlockGenerator() + +type mockRouting struct { + provided chan cid.Cid +} + +func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { + select { + case r.provided <- cid: + case <-ctx.Done(): + panic("context cancelled, but shouldn't have") + } + return nil +} + +func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan peer.AddrInfo { + return nil +} + +func mockContentRouting() *mockRouting { + r := mockRouting{} + r.provided = make(chan cid.Cid) + return &r +} + +func TestAnnouncement(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r) + prov.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = prov.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") + } + } + prov.Close() + + select { + case cp := <-r.provided: + t.Fatal("did not expect to provide CID: ", cp) + case <-time.After(time.Second * 1): + } +} + +func TestClose(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r) + prov.Run() + + prov.Close() + + select { + case cp := <-r.provided: + t.Fatal("did not expect to provide anything, provided: ", cp) + case <-time.After(time.Second * 1): + } +} + +func TestAnnouncementTimeout(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r, WithTimeout(1*time.Second)) + prov.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = prov.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go new file mode 100644 index 000000000..a29b484fc --- /dev/null +++ b/provider/simple/reprovide.go @@ -0,0 +1,255 @@ +package simple + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/cenkalti/backoff" + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + "github.com/ipfs/boxo/verifcid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" + logging "github.com/ipfs/go-log" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/libp2p/go-libp2p/core/routing" +) + +var logR = logging.Logger("reprovider.simple") + +// ErrClosed is returned by Trigger when operating on a closed reprovider. +var ErrClosed = errors.New("reprovider service stopped") + +// KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) + +// Reprovider reannounces blocks to the network +type Reprovider struct { + // Reprovider context. Cancel to stop, then wait on closedCh. + ctx context.Context + cancel context.CancelFunc + closedCh chan struct{} + + // Trigger triggers a reprovide. + trigger chan chan<- error + + // The routing system to provide values through + rsys routing.ContentRouting + + keyProvider KeyChanFunc + + tick time.Duration +} + +// NewReprovider creates new Reprovider instance. +func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { + ctx, cancel := context.WithCancel(ctx) + return &Reprovider{ + ctx: ctx, + cancel: cancel, + closedCh: make(chan struct{}), + trigger: make(chan chan<- error), + + rsys: rsys, + keyProvider: keyProvider, + tick: reprovideInterval, + } +} + +// Close the reprovider +func (rp *Reprovider) Close() error { + rp.cancel() + <-rp.closedCh + return nil +} + +// Run re-provides keys with 'tick' interval or when triggered +func (rp *Reprovider) Run() { + defer close(rp.closedCh) + + var initialReprovideCh, reprovideCh <-chan time.Time + + // If reproviding is enabled (non-zero) + if rp.tick > 0 { + reprovideTicker := time.NewTicker(rp.tick) + defer reprovideTicker.Stop() + reprovideCh = reprovideTicker.C + + // If the reprovide ticker is larger than a minute (likely), + // provide once after we've been up a minute. + // + // Don't provide _immediately_ as we might be just about to stop. + if rp.tick > time.Minute { + initialReprovideTimer := time.NewTimer(time.Minute) + defer initialReprovideTimer.Stop() + + initialReprovideCh = initialReprovideTimer.C + } + } + + var done chan<- error + for rp.ctx.Err() == nil { + select { + case <-initialReprovideCh: + case <-reprovideCh: + case done = <-rp.trigger: + case <-rp.ctx.Done(): + return + } + + err := rp.Reprovide() + + // only log if we've hit an actual error, otherwise just tell the client we're shutting down + if rp.ctx.Err() != nil { + err = ErrClosed + } else if err != nil { + logR.Errorf("failed to reprovide: %s", err) + } + + if done != nil { + if err != nil { + done <- err + } + close(done) + } + } +} + +// Reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) Reprovide() error { + keychan, err := rp.keyProvider(rp.ctx) + if err != nil { + return fmt.Errorf("failed to get key chan: %s", err) + } + for c := range keychan { + // hash security + if err := verifcid.ValidateCid(c); err != nil { + logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) + continue + } + op := func() error { + err := rp.rsys.Provide(rp.ctx, c, true) + if err != nil { + logR.Debugf("Failed to provide key: %s", err) + } + return err + } + + err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx)) + if err != nil { + logR.Debugf("Providing failed after number of retries: %s", err) + return err + } + } + return nil +} + +// Trigger starts the reprovision process in rp.Run and waits for it to finish. +// +// Returns an error if a reprovide is already in progress. +func (rp *Reprovider) Trigger(ctx context.Context) error { + resultCh := make(chan error, 1) + select { + case rp.trigger <- resultCh: + default: + return fmt.Errorf("reprovider is already running") + } + + select { + case err := <-resultCh: + return err + case <-rp.ctx.Done(): + return ErrClosed + case <-ctx.Done(): + return ctx.Err() + } +} + +// Strategies + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// Pinner interface defines how the simple.Reprovider wants to interact +// with a Pinning service +type Pinner interface { + DirectKeys(ctx context.Context) ([]cid.Cid, error) + RecursiveKeys(ctx context.Context) ([]cid.Cid, error) +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) + + dkeys, err := pinning.DirectKeys(ctx) + if err != nil { + logR.Errorf("reprovide direct pins: %s", err) + return + } + for _, key := range dkeys { + set.Visitor(ctx)(key) + } + + rkeys, err := pinning.RecursiveKeys(ctx) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + + session := fetchConfig.NewSession(ctx) + for _, key := range rkeys { + set.Visitor(ctx)(key) + if !onlyRoots { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + clink, ok := res.LastBlockLink.(cidlink.Link) + if ok { + set.Visitor(ctx)(clink.Cid) + } + return nil + }) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil +} diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go new file mode 100644 index 000000000..8b521ae56 --- /dev/null +++ b/provider/simple/reprovide_test.go @@ -0,0 +1,289 @@ +package simple_test + +import ( + "bytes" + "context" + "testing" + "time" + + bsrv "github.com/ipfs/boxo/blockservice" + blockstore "github.com/ipfs/boxo/blockstore" + offline "github.com/ipfs/boxo/exchange/offline" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" + "github.com/ipfs/boxo/internal/test" + mock "github.com/ipfs/boxo/routing/mock" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/fluent/qp" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + testutil "github.com/libp2p/go-libp2p-testing/net" + "github.com/libp2p/go-libp2p/core/peer" + mh "github.com/multiformats/go-multihash" + + . "github.com/ipfs/boxo/provider/simple" +) + +func setupRouting(t *testing.T) (clA, clB mock.Client, idA, idB peer.ID) { + mrserv := mock.NewServer() + + iidA := testutil.RandIdentityOrFatal(t) + iidB := testutil.RandIdentityOrFatal(t) + + clA = mrserv.Client(iidA) + clB = mrserv.Client(iidB) + + return clA, clB, iidA.ID(), iidB.ID() +} + +func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { + bstore = blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + for _, data := range []string{"foo", "bar"} { + nb := basicnode.Prototype.Any.NewBuilder() + err := nb.AssignString(data) + if err != nil { + t.Fatal(err) + } + blk := toBlock(t, nb.Build()) + err = bstore.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + nodes = append(nodes, blk.Cid()) + nd, err := qp.BuildMap(basicnode.Prototype.Map, 1, func(ma ipld.MapAssembler) { + qp.MapEntry(ma, "child", qp.Link(cidlink.Link{Cid: blk.Cid()})) + }) + if err != nil { + t.Fatal(err) + } + blk = toBlock(t, nd) + err = bstore.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + nodes = append(nodes, blk.Cid()) + } + + return nodes, bstore +} + +func toBlock(t *testing.T, nd ipld.Node) blocks.Block { + buf := new(bytes.Buffer) + err := dagcbor.Encode(nd, buf) + if err != nil { + t.Fatal(err) + } + c, err := cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: mh.SHA2_256, + MhLength: -1, + }.Sum(buf.Bytes()) + if err != nil { + t.Fatal(err) + } + blk, err := blocks.NewBlockWithCid(buf.Bytes(), c) + if err != nil { + t.Fatal(err) + } + return blk +} + +func TestReprovide(t *testing.T) { + test.Flaky(t) + testReprovide(t, func(r *Reprovider, ctx context.Context) error { + return r.Reprovide() + }) +} + +func TestTrigger(t *testing.T) { + test.Flaky(t) + testReprovide(t, func(r *Reprovider, ctx context.Context) error { + go r.Run() + time.Sleep(1 * time.Second) + defer r.Close() + err := r.Trigger(ctx) + return err + }) +} + +func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clA, clB, idA, _ := setupRouting(t) + nodes, bstore := setupDag(t) + + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) + reprov.Trigger(context.Background()) + err := trigger(reprov, ctx) + if err != nil { + t.Fatal(err) + } + + var providers []peer.AddrInfo + maxProvs := 100 + + for _, c := range nodes { + // We provide raw cids because of the multihash keying + // FIXME(@Jorropo): I think this change should be done in the DHT layer, probably an issue with our routing mock. + b := c.Bytes() + b[1] = 0x55 // rewrite the cid to raw + _, c, err := cid.CidFromBytes(b) + if err != nil { + t.Fatal(err) + } + provChan := clB.FindProvidersAsync(ctx, c, maxProvs) + for p := range provChan { + providers = append(providers, p) + } + + if len(providers) == 0 { + t.Fatal("Should have gotten a provider") + } + + if providers[0].ID != idA { + t.Fatal("Somehow got the wrong peer back as a provider.") + } + } +} + +func TestTriggerTwice(t *testing.T) { + test.Flaky(t) + // Ensure we can only trigger once at a time. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clA, _, _, _ := setupRouting(t) + + keyCh := make(chan cid.Cid) + startCh := make(chan struct{}) + keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) { + <-startCh + return keyCh, nil + } + + reprov := NewReprovider(ctx, time.Hour, clA, keyFunc) + go reprov.Run() + defer reprov.Close() + + // Wait for the reprovider to start, otherwise, the reprovider will + // think a concurrent reprovide is running. + // + // We _could_ fix this race... but that would be complexity for nothing. + // 1. We start a reprovide 1 minute after startup anyways. + // 2. The window is really narrow. + time.Sleep(1 * time.Second) + + errCh := make(chan error, 2) + + // Trigger in the background + go func() { + errCh <- reprov.Trigger(ctx) + }() + + // Wait for the trigger to really start. + startCh <- struct{}{} + + start := time.Now() + // Try to trigger again, this should fail immediately. + if err := reprov.Trigger(ctx); err == nil { + t.Fatal("expected an error") + } + if time.Since(start) > 10*time.Millisecond { + t.Fatal("expected reprovide to fail instantly") + } + + // Let the trigger progress. + close(keyCh) + + // Check the result. + err := <-errCh + if err != nil { + t.Fatal(err) + } + + // Try to trigger again, this should work. + go func() { + errCh <- reprov.Trigger(ctx) + }() + startCh <- struct{}{} + err = <-errCh + if err != nil { + t.Fatal(err) + } +} + +type mockPinner struct { + recursive []cid.Cid + direct []cid.Cid +} + +func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { + return mp.direct, nil +} + +func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { + return mp.recursive, nil +} + +func TestReprovidePinned(t *testing.T) { + test.Flaky(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nodes, bstore := setupDag(t) + + fetchConfig := bsfetcher.NewFetcherConfig(bsrv.New(bstore, offline.Exchange(bstore))) + + for i := 0; i < 2; i++ { + clA, clB, idA, _ := setupRouting(t) + + onlyRoots := i == 0 + t.Logf("only roots: %v", onlyRoots) + + var provide, dont []cid.Cid + if onlyRoots { + provide = []cid.Cid{nodes[1], nodes[3]} + dont = []cid.Cid{nodes[0], nodes[2]} + } else { + provide = []cid.Cid{nodes[0], nodes[1], nodes[3]} + dont = []cid.Cid{nodes[2]} + } + + keyProvider := NewPinnedProvider(onlyRoots, &mockPinner{ + recursive: []cid.Cid{nodes[1]}, + direct: []cid.Cid{nodes[3]}, + }, fetchConfig) + + reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) + err := reprov.Reprovide() + if err != nil { + t.Fatal(err) + } + + for i, c := range provide { + prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) + if !ok { + t.Errorf("Should have gotten a provider for %d", i) + continue + } + + if prov.ID != idA { + t.Errorf("Somehow got the wrong peer back as a provider.") + continue + } + } + for i, c := range dont { + prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) + if ok { + t.Fatalf("found provider %s for %d, expected none", prov.ID, i) + } + } + } +} diff --git a/provider/system.go b/provider/system.go new file mode 100644 index 000000000..9fc3e8879 --- /dev/null +++ b/provider/system.go @@ -0,0 +1,60 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +// System defines the interface for interacting with the value +// provider system +type System interface { + Run() + Close() error + Provide(cid.Cid) error + Reprovide(context.Context) error +} + +type system struct { + provider Provider + reprovider Reprovider +} + +// NewSystem constructs a new provider system from a provider and reprovider +func NewSystem(provider Provider, reprovider Reprovider) System { + return &system{provider, reprovider} +} + +// Run the provider system by running the provider and reprovider +func (s *system) Run() { + go s.provider.Run() + go s.reprovider.Run() +} + +// Close the provider and reprovider +func (s *system) Close() error { + var errs []error + + if err := s.provider.Close(); err != nil { + errs = append(errs, err) + } + + if err := s.reprovider.Close(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errs[0] + } + return nil +} + +// Provide a value +func (s *system) Provide(cid cid.Cid) error { + return s.provider.Provide(cid) +} + +// Reprovide all the previously provided values +func (s *system) Reprovide(ctx context.Context) error { + return s.reprovider.Trigger(ctx) +}