From 4c5c98b94b219d9870c3181f73a99d2f15edae7d Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 2 Jun 2023 04:49:03 +0200 Subject: [PATCH] provider: revert throughput callback and related refactor This reverts commits: - 0ff6929cc9a1b143c10860936858d268d5a4897b: provider: add breaking changes to the changelog (#330) - 0962ed63b07127b95a67423c3ddf820558b3d829: relocated magic numbers, updated Reprovide Interval from 24h to 22h - ac047a5b4653955dfc522e2079a20b7e2e85e907: provider: refactor to only maintain one batched implementation and add throughput callback This seems to have an unknown deadlock that is blocking Kubo's sharness tests, the code coverage is also pretty bad I forgot to migrate the pinning reproviding tests because they weren't ran directly over batched. https://app.codecov.io/gh/ipfs/boxo/commit/20e2aae45ec6ecd21c905993c394e9cc767c9038/tree/provider --- CHANGELOG.md | 9 - go.mod | 1 + go.sum | 2 + provider/README.md | 30 ++ provider/{reprovider.go => batched/system.go} | 282 ++++++----------- provider/batched/system_test.go | 119 ++++++++ provider/noop.go | 32 -- provider/offline.go | 29 ++ provider/provider.go | 108 +------ provider/{internal => }/queue/queue.go | 22 +- provider/{internal => }/queue/queue_test.go | 39 ++- provider/reprovider_test.go | 219 ------------- provider/simple/provider.go | 116 +++++++ provider/simple/provider_test.go | 166 ++++++++++ provider/simple/reprovide.go | 255 ++++++++++++++++ provider/simple/reprovide_test.go | 289 ++++++++++++++++++ provider/system.go | 60 ++++ 17 files changed, 1198 insertions(+), 580 deletions(-) create mode 100644 provider/README.md rename provider/{reprovider.go => batched/system.go} (52%) create mode 100644 provider/batched/system_test.go delete mode 100644 provider/noop.go create mode 100644 provider/offline.go rename provider/{internal => }/queue/queue.go (86%) rename provider/{internal => }/queue/queue_test.go (80%) delete mode 100644 provider/reprovider_test.go create mode 100644 provider/simple/provider.go create mode 100644 provider/simple/provider_test.go create mode 100644 provider/simple/reprovide.go create mode 100644 provider/simple/reprovide_test.go create mode 100644 provider/system.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 056791e0b..efc2c6c12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,15 +19,6 @@ The following emojis are used to highlight certain changes: - Updated, higher-definition icons in directory listings. - Customizable menu items next to "About IPFS" and "Install IPFS". - Valid DAG-CBOR and DAG-JSON blocks now provide a preview, where links can be followed. -- 🛠 Provider API refactor - - `provider/queue` has been moved to `provider/internal/queue`. - - `provider/batched.New` has been moved to `provider.New` and arguments has been changed: - - a routing system is now passed with the `provider.Online` option, by default the system run in offline mode (push stuff onto the queue); and - - you do not have to pass a queue anymore, you pass a `datastore.Datastore` exclusively. - - `provider/simple` has been removed, now instead `provider.New` will accept non batched routing systems and use type assertion for the `ProvideMany` call, giving a single implementation. - - `provider.NewOfflineProvider` has been renamed to `provider.NewNoopProvider` to show more clearly that is does nothing. - - `provider.NewSystem` has been removed, `provider.New` now returns a `provider.System` directly. - - `provider.Provider` and `provider.Reprovider` has been merged under one `provider.System` ## [0.8.0] - 2023-04-05 ### Added diff --git a/go.mod b/go.mod index 2d08290fc..5be828081 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/benbjohnson/clock v1.3.0 + github.com/cenkalti/backoff v2.2.1+incompatible github.com/cespare/xxhash/v2 v2.2.0 github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 github.com/cskr/pubsub v1.0.2 diff --git a/go.sum b/go.sum index 7b4a7f276..86980317b 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/provider/README.md b/provider/README.md new file mode 100644 index 000000000..0e4f4650d --- /dev/null +++ b/provider/README.md @@ -0,0 +1,30 @@ +## Usage + +Here's how you create, start, interact with, and stop the provider system: + +```golang +import ( + "context" + "time" + + "github.com/ipfs/boxo/provider" + "github.com/ipfs/boxo/provider/queue" + "github.com/ipfs/boxo/provider/simple" +) + +rsys := (your routing system here) +dstore := (your datastore here) +cid := (your cid to provide here) + +q := queue.NewQueue(context.Background(), "example", dstore) + +reprov := simple.NewReprovider(context.Background(), time.Hour * 12, rsys, simple.NewBlockstoreProvider(dstore)) +prov := simple.NewProvider(context.Background(), q, rsys) +sys := provider.NewSystem(prov, reprov) + +sys.Run() + +sys.Provide(cid) + +sys.Close() +``` diff --git a/provider/reprovider.go b/provider/batched/system.go similarity index 52% rename from provider/reprovider.go rename to provider/batched/system.go index 2d79a62d8..e3cb0325a 100644 --- a/provider/reprovider.go +++ b/provider/batched/system.go @@ -1,44 +1,26 @@ -package provider +package batched import ( "context" "errors" "fmt" - "math" "strconv" "sync" "time" - "github.com/ipfs/boxo/provider/internal/queue" + provider "github.com/ipfs/boxo/provider" + "github.com/ipfs/boxo/provider/queue" + "github.com/ipfs/boxo/provider/simple" "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - namespace "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log" "github.com/multiformats/go-multihash" ) -const ( - // MAGIC: how long we wait before reproviding a key - DefaultReproviderInterval = time.Hour * 22 // https://github.com/ipfs/kubo/pull/9326 - - // MAGIC: If the reprovide ticker is larger than a minute (likely), provide - // once after we've been up a minute. Don't provide _immediately_ as we - // might be just about to stop. - defaultInitialReprovideDelay = time.Minute - - // MAGIC: how long we wait between the first provider we hear about and - // batching up the provides to send out - pauseDetectionThreshold = time.Millisecond * 500 - - // MAGIC: how long we are willing to collect providers for the batch after - // we receive the first one - maxCollectionDuration = time.Minute * 10 -) - var log = logging.Logger("provider.batched") -type reprovider struct { +type BatchProvidingSystem struct { ctx context.Context close context.CancelFunc closewg sync.WaitGroup @@ -47,64 +29,39 @@ type reprovider struct { initalReprovideDelay time.Duration initialReprovideDelaySet bool - rsys Provide - keyProvider KeyChanFunc + rsys provideMany + keyProvider simple.KeyChanFunc q *queue.Queue ds datastore.Batching reprovideCh chan cid.Cid - maxReprovideBatchSize uint - - statLk sync.Mutex - totalProvides, lastReprovideBatchSize uint64 + totalProvides, lastReprovideBatchSize int avgProvideDuration, lastReprovideDuration time.Duration - - throughputCallback ThroughputCallback - // throughputProvideCurrentCount counts how many provides has been done since the last call to throughputCallback - throughputProvideCurrentCount uint - // throughputDurationSum sums up durations between two calls to the throughputCallback - throughputDurationSum time.Duration - throughputMinimumProvides uint - - keyPrefix datastore.Key } -var _ System = (*reprovider)(nil) +var _ provider.System = (*BatchProvidingSystem)(nil) -type Provide interface { - Provide(context.Context, cid.Cid, bool) error -} - -type ProvideMany interface { +type provideMany interface { ProvideMany(ctx context.Context, keys []multihash.Multihash) error -} - -type Ready interface { Ready() bool } // Option defines the functional option type that can be used to configure // BatchProvidingSystem instances -type Option func(system *reprovider) error - -var lastReprovideKey = datastore.NewKey("/reprovide/lastreprovide") -var DefaultKeyPrefix = datastore.NewKey("/provider") - -// New creates a new [System]. By default it is offline, that means it will -// enqueue tasks in ds. -// To have it publish records in the network use the [Online] option. -// If provider casts to [ProvideMany] the [ProvideMany.ProvideMany] method will -// be called instead. -// -// If provider casts to [Ready], it will wait until [Ready.Ready] is true. -func New(ds datastore.Batching, opts ...Option) (System, error) { - s := &reprovider{ - reprovideInterval: DefaultReproviderInterval, - maxReprovideBatchSize: math.MaxUint, - keyPrefix: DefaultKeyPrefix, - reprovideCh: make(chan cid.Cid), +type Option func(system *BatchProvidingSystem) error + +var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") + +func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { + s := &BatchProvidingSystem{ + reprovideInterval: time.Hour * 24, + rsys: provider, + keyProvider: nil, + q: q, + ds: datastore.NewMapDatastore(), + reprovideCh: make(chan cid.Cid), } for _, o := range opts { @@ -114,8 +71,13 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } // Setup default behavior for the initial reprovide delay - if !s.initialReprovideDelaySet && s.reprovideInterval > defaultInitialReprovideDelay { - s.initalReprovideDelay = defaultInitialReprovideDelay + // + // If the reprovide ticker is larger than a minute (likely), + // provide once after we've been up a minute. + // + // Don't provide _immediately_ as we might be just about to stop. + if !s.initialReprovideDelaySet && s.reprovideInterval > time.Minute { + s.initalReprovideDelay = time.Minute s.initialReprovideDelaySet = true } @@ -127,87 +89,50 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } } - s.ds = namespace.Wrap(ds, s.keyPrefix) - s.q = queue.NewQueue(s.ds) - // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options ctx, cancel := context.WithCancel(context.Background()) s.ctx = ctx s.close = cancel - if s.rsys != nil { - if _, ok := s.rsys.(ProvideMany); !ok { - s.maxReprovideBatchSize = 1 - } - - s.run() - } - return s, nil } -func ReproviderInterval(duration time.Duration) Option { - return func(system *reprovider) error { - system.reprovideInterval = duration - return nil - } -} - -func KeyProvider(fn KeyChanFunc) Option { - return func(system *reprovider) error { - system.keyProvider = fn - return nil - } -} - -// DatastorePrefix sets a prefix for internal state stored in the Datastore. -// Defaults to [DefaultKeyPrefix]. -func DatastorePrefix(k datastore.Key) Option { - return func(system *reprovider) error { - system.keyPrefix = k +func Datastore(batching datastore.Batching) Option { + return func(system *BatchProvidingSystem) error { + system.ds = batching return nil } } -// ThroughputReport will fire the callback synchronously once at least limit -// multihashes have been advertised, it will then wait until a new set of at least -// limit multihashes has been advertised. -// While ThroughputReport is set batches will be at most minimumProvides big. -// If it returns false it wont ever be called again. -func ThroughputReport(f ThroughputCallback, minimumProvides uint) Option { - return func(system *reprovider) error { - system.throughputCallback = f - system.throughputMinimumProvides = minimumProvides +func ReproviderInterval(duration time.Duration) Option { + return func(system *BatchProvidingSystem) error { + system.reprovideInterval = duration return nil } } -type ThroughputCallback = func(reprovide bool, complete bool, totalKeysProvided uint, totalDuration time.Duration) (continueWatching bool) - -// Online will enable the router and make it send publishes online. -// nil can be used to turn the router offline. -// You can't register multiple providers, if this option is passed multiple times -// it will error. -func Online(rsys Provide) Option { - return func(system *reprovider) error { - if system.rsys != nil { - return fmt.Errorf("trying to register two provider on the same reprovider") - } - system.rsys = rsys +func KeyProvider(fn simple.KeyChanFunc) Option { + return func(system *BatchProvidingSystem) error { + system.keyProvider = fn return nil } } func initialReprovideDelay(duration time.Duration) Option { - return func(system *reprovider) error { + return func(system *BatchProvidingSystem) error { system.initialReprovideDelaySet = true system.initalReprovideDelay = duration return nil } } -func (s *reprovider) run() { +func (s *BatchProvidingSystem) Run() { + // how long we wait between the first provider we hear about and batching up the provides to send out + const pauseDetectionThreshold = time.Millisecond * 500 + // how long we are willing to collect providers for the batch after we receive the first one + const maxCollectionDuration = time.Minute * 10 + provCh := s.q.Dequeue() s.closewg.Add(1) @@ -241,16 +166,27 @@ func (s *reprovider) run() { for { performedReprovide := false - complete := false - - batchSize := s.maxReprovideBatchSize - if s.throughputCallback != nil && s.throughputMinimumProvides < batchSize { - batchSize = s.throughputMinimumProvides - } // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be // stopped and have empty channels - for uint(len(m)) < batchSize { + loop: + for { + select { + case <-maxCollectionDurationTimer.C: + // if this timer has fired then the pause timer has started so let's stop and empty it + stopAndEmptyTimer(pauseDetectTimer) + break loop + default: + } + + select { + case c := <-provCh: + resetTimersAfterReceivingProvide() + m[c] = struct{}{} + continue + default: + } + select { case c := <-provCh: resetTimersAfterReceivingProvide() @@ -262,19 +198,15 @@ func (s *reprovider) run() { case <-pauseDetectTimer.C: // if this timer has fired then the max collection timer has started so let's stop and empty it stopAndEmptyTimer(maxCollectionDurationTimer) - complete = true - goto AfterLoop + break loop case <-maxCollectionDurationTimer.C: // if this timer has fired then the pause timer has started so let's stop and empty it stopAndEmptyTimer(pauseDetectTimer) - goto AfterLoop + break loop case <-s.ctx.Done(): return } } - stopAndEmptyTimer(pauseDetectTimer) - stopAndEmptyTimer(maxCollectionDurationTimer) - AfterLoop: if len(m) == 0 { continue @@ -298,63 +230,41 @@ func (s *reprovider) run() { continue } - if r, ok := s.rsys.(Ready); ok { - ticker := time.NewTicker(time.Minute) - for !r.Ready() { - log.Debugf("reprovider system not ready") - select { - case <-ticker.C: - case <-s.ctx.Done(): - return - } + for !s.rsys.Ready() { + log.Debugf("reprovider system not ready") + select { + case <-time.After(time.Minute): + case <-s.ctx.Done(): + return } - ticker.Stop() } log.Debugf("starting provide of %d keys", len(keys)) start := time.Now() - err := doProvideMany(s.ctx, s.rsys, keys) + err := s.rsys.ProvideMany(s.ctx, keys) if err != nil { log.Debugf("providing failed %v", err) continue } dur := time.Since(start) - totalProvideTime := time.Duration(s.totalProvides) * s.avgProvideDuration - recentAvgProvideDuration := dur / time.Duration(len(keys)) - - s.statLk.Lock() - s.avgProvideDuration = time.Duration((totalProvideTime + dur) / (time.Duration(s.totalProvides) + time.Duration(len(keys)))) - s.totalProvides += uint64(len(keys)) + totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) + recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys))) + s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) + s.totalProvides += len(keys) log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) if performedReprovide { - s.lastReprovideBatchSize = uint64(len(keys)) + s.lastReprovideBatchSize = len(keys) s.lastReprovideDuration = dur - s.statLk.Unlock() - - // Don't hold the lock while writing to disk, consumers don't need to wait on IO to read thoses fields. - if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil { log.Errorf("could not store last reprovide time: %v", err) } if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil { log.Errorf("could not perform sync of last reprovide time: %v", err) } - } else { - s.statLk.Unlock() - } - - s.throughputDurationSum += dur - s.throughputProvideCurrentCount += uint(len(keys)) - if s.throughputCallback != nil && s.throughputProvideCurrentCount >= s.throughputMinimumProvides { - if more := s.throughputCallback(performedReprovide, complete, s.throughputProvideCurrentCount, s.throughputDurationSum); !more { - s.throughputCallback = nil - } - s.throughputProvideCurrentCount = 0 - s.throughputDurationSum = 0 } } }() @@ -417,22 +327,22 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } -func (s *reprovider) Close() error { +func (s *BatchProvidingSystem) Close() error { s.close() err := s.q.Close() s.closewg.Wait() return err } -func (s *reprovider) Provide(cid cid.Cid) error { +func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { return s.q.Enqueue(cid) } -func (s *reprovider) Reprovide(ctx context.Context) error { +func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { return s.reprovide(ctx, true) } -func (s *reprovider) reprovide(ctx context.Context, force bool) error { +func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { if !s.shouldReprovide() && !force { return nil } @@ -463,7 +373,7 @@ reprovideCidLoop: return nil } -func (s *reprovider) getLastReprovideTime() (time.Time, error) { +func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { val, err := s.ds.Get(s.ctx, lastReprovideKey) if errors.Is(err, datastore.ErrNotFound) { return time.Time{}, nil @@ -480,45 +390,31 @@ func (s *reprovider) getLastReprovideTime() (time.Time, error) { return t, nil } -func (s *reprovider) shouldReprovide() bool { +func (s *BatchProvidingSystem) shouldReprovide() bool { t, err := s.getLastReprovideTime() if err != nil { log.Debugf("getting last reprovide time failed: %s", err) return false } - if time.Since(t) < s.reprovideInterval { + if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) { return false } return true } -type ReproviderStats struct { - TotalProvides, LastReprovideBatchSize uint64 +type BatchedProviderStats struct { + TotalProvides, LastReprovideBatchSize int AvgProvideDuration, LastReprovideDuration time.Duration } // Stat returns various stats about this provider system -func (s *reprovider) Stat() (ReproviderStats, error) { - s.statLk.Lock() - defer s.statLk.Unlock() - return ReproviderStats{ +func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) { + // TODO: Does it matter that there is no locking around the total+average values? + return BatchedProviderStats{ TotalProvides: s.totalProvides, LastReprovideBatchSize: s.lastReprovideBatchSize, AvgProvideDuration: s.avgProvideDuration, LastReprovideDuration: s.lastReprovideDuration, }, nil } - -func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) error { - if many, ok := r.(ProvideMany); ok { - return many.ProvideMany(ctx, keys) - } - - for _, k := range keys { - if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil { - return err - } - } - return nil -} diff --git a/provider/batched/system_test.go b/provider/batched/system_test.go new file mode 100644 index 000000000..c8a7d7b84 --- /dev/null +++ b/provider/batched/system_test.go @@ -0,0 +1,119 @@ +package batched + +import ( + "context" + "strconv" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + + "github.com/ipfs/boxo/internal/test" + q "github.com/ipfs/boxo/provider/queue" +) + +type mockProvideMany struct { + lk sync.Mutex + keys []mh.Multihash +} + +func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = keys + return nil +} + +func (m *mockProvideMany) Ready() bool { + return true +} + +func (m *mockProvideMany) GetKeys() []mh.Multihash { + m.lk.Lock() + defer m.lk.Unlock() + return m.keys[:] +} + +var _ provideMany = (*mockProvideMany)(nil) + +func TestBatched(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + provider := &mockProvideMany{} + + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + const numProvides = 100 + keysToProvide := make(map[cid.Cid]int) + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, h) + keysToProvide[c] = i + } + + batchSystem, err := New(provider, queue, KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + for k := range keysToProvide { + select { + case ch <- k: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + }), initialReprovideDelay(0)) + if err != nil { + t.Fatal(err) + } + + batchSystem.Run() + + var keys []mh.Multihash + for { + if ctx.Err() != nil { + t.Fatal("test hung") + } + keys = provider.GetKeys() + if len(keys) != 0 { + break + } + time.Sleep(time.Millisecond * 100) + } + + if len(keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) + } + + provMap := make(map[string]struct{}) + for _, k := range keys { + provMap[string(k)] = struct{}{} + } + + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + if _, found := provMap[string(h)]; !found { + t.Fatalf("could not find provider with value %d", i) + } + } +} diff --git a/provider/noop.go b/provider/noop.go deleted file mode 100644 index 5367ccb30..000000000 --- a/provider/noop.go +++ /dev/null @@ -1,32 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -type noopProvider struct{} - -var _ System = (*noopProvider)(nil) - -// NewNoopProvider creates a ProviderSystem that does nothing. -func NewNoopProvider() System { - return &noopProvider{} -} - -func (op *noopProvider) Close() error { - return nil -} - -func (op *noopProvider) Provide(cid.Cid) error { - return nil -} - -func (op *noopProvider) Reprovide(context.Context) error { - return nil -} - -func (op *noopProvider) Stat() (ReproviderStats, error) { - return ReproviderStats{}, nil -} diff --git a/provider/offline.go b/provider/offline.go new file mode 100644 index 000000000..030a70ab1 --- /dev/null +++ b/provider/offline.go @@ -0,0 +1,29 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +type offlineProvider struct{} + +// NewOfflineProvider creates a ProviderSystem that does nothing +func NewOfflineProvider() System { + return &offlineProvider{} +} + +func (op *offlineProvider) Run() { +} + +func (op *offlineProvider) Close() error { + return nil +} + +func (op *offlineProvider) Provide(cid.Cid) error { + return nil +} + +func (op *offlineProvider) Reprovide(context.Context) error { + return nil +} diff --git a/provider/provider.go b/provider/provider.go index 8b85fe62f..3b9c6ba3e 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -3,115 +3,25 @@ package provider import ( "context" - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" - logging "github.com/ipfs/go-log" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) -var logR = logging.Logger("reprovider.simple") - // Provider announces blocks to the network type Provider interface { + // Run is used to begin processing the provider work + Run() // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error + // Close stops the provider + Close() error } // Reprovider reannounces blocks to the network type Reprovider interface { - // Reprovide starts a new reprovide if one isn't running already. - Reprovide(context.Context) error -} - -// System defines the interface for interacting with the value -// provider system -type System interface { + // Run is used to begin processing the reprovider work and waiting for reprovide triggers + Run() + // Trigger a reprovide + Trigger(context.Context) error + // Close stops the reprovider Close() error - Stat() (ReproviderStats, error) - Provider - Reprovider -} - -// KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } -} - -func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { - set.Visitor(ctx)(key) - } - - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - - session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) - if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { - clink, ok := res.LastBlockLink.(cidlink.Link) - if ok { - set.Visitor(ctx)(clink.Cid) - } - return nil - }) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil } diff --git a/provider/internal/queue/queue.go b/provider/queue/queue.go similarity index 86% rename from provider/internal/queue/queue.go rename to provider/queue/queue.go index dada92788..618256bbe 100644 --- a/provider/internal/queue/queue.go +++ b/provider/queue/queue.go @@ -3,7 +3,6 @@ package queue import ( "context" "fmt" - cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -21,6 +20,7 @@ var log = logging.Logger("provider.queue") type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider + name string ctx context.Context ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid @@ -32,10 +32,11 @@ type Queue struct { } // NewQueue creates a queue for cids -func NewQueue(ds datastore.Datastore) *Queue { - namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) - cancelCtx, cancel := context.WithCancel(context.Background()) +func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) + cancelCtx, cancel := context.WithCancel(ctx) q := &Queue{ + name: name, ctx: cancelCtx, ds: namespaced, dequeue: make(chan cid.Cid), @@ -44,16 +45,13 @@ func NewQueue(ds datastore.Datastore) *Queue { closed: make(chan struct{}, 1), } q.work() - return q + return q, nil } // Close stops the queue func (q *Queue) Close() error { q.close() <-q.closed - // We don't close dequeue because the provider which consume this get caught in - // an infinite loop dequeing cid.Undef if we do that. - // The provider has it's own select on top of dequeue and will handle this by itself. return nil } @@ -81,6 +79,8 @@ func (q *Queue) work() { defer func() { // also cancels any in-progess enqueue tasks. q.close() + // unblocks anyone waiting + close(q.dequeue) // unblocks the close call close(q.closed) }() @@ -121,12 +121,6 @@ func (q *Queue) work() { q.counter++ nextKey := datastore.NewKey(keyPath) - if c == cid.Undef { - // fast path, skip rereading the datastore if we don't have anything in hand yet - c = toQueue - k = nextKey - } - if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue diff --git a/provider/internal/queue/queue_test.go b/provider/queue/queue_test.go similarity index 80% rename from provider/internal/queue/queue_test.go rename to provider/queue/queue_test.go index a2d9f0be4..9eacf4349 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/queue/queue_test.go @@ -43,8 +43,10 @@ func TestBasicOperation(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) - defer queue.Close() + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(10) @@ -61,8 +63,10 @@ func TestMangledData(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) - defer queue.Close() + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(10) for _, c := range cids { @@ -71,7 +75,7 @@ func TestMangledData(t *testing.T) { // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err := queue.ds.Put(ctx, queueKey, []byte("borked")) + err = queue.ds.Put(ctx, queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -87,8 +91,10 @@ func TestInitialization(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) - defer queue.Close() + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(10) for _, c := range cids { @@ -98,8 +104,10 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[:5], queue, t) // make a new queue, same data - queue = NewQueue(ds) - defer queue.Close() + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } assertOrdered(cids[5:], queue, t) } @@ -110,18 +118,21 @@ func TestInitializationWithManyCids(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } cids := makeCids(25) for _, c := range cids { queue.Enqueue(c) } - queue.Close() - // make a new queue, same data - queue = NewQueue(ds) - defer queue.Close() + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } assertOrdered(cids, queue, t) } diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go deleted file mode 100644 index bfb8fc187..000000000 --- a/provider/reprovider_test.go +++ /dev/null @@ -1,219 +0,0 @@ -package provider - -import ( - "bytes" - "context" - "runtime" - "strconv" - "sync" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - mh "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/assert" -) - -type allFeatures interface { - Provide - ProvideMany - Ready -} - -type mockProvideMany struct { - delay time.Duration - lk sync.Mutex - keys []mh.Multihash - calls uint -} - -func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = append(m.keys, keys...) - m.calls++ - time.Sleep(time.Duration(len(keys)) * m.delay) - return nil -} - -func (m *mockProvideMany) Provide(ctx context.Context, key cid.Cid, _ bool) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = append(m.keys, key.Hash()) - m.calls++ - time.Sleep(m.delay) - return nil -} - -func (m *mockProvideMany) Ready() bool { - return true -} - -func (m *mockProvideMany) GetKeys() (keys []mh.Multihash, calls uint) { - m.lk.Lock() - defer m.lk.Unlock() - return append([]mh.Multihash(nil), m.keys...), m.calls -} - -var _ allFeatures = (*mockProvideMany)(nil) - -type allButMany interface { - Provide - Ready -} - -type singleMockWrapper struct { - allButMany -} - -func TestReprovider(t *testing.T) { - t.Parallel() - t.Run("many", func(t *testing.T) { - t.Parallel() - testProvider(t, false) - }) - t.Run("single", func(t *testing.T) { - t.Parallel() - testProvider(t, true) - }) -} - -func testProvider(t *testing.T, singleProvide bool) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - - // It has to be so big because the combo of noisy CI runners + OSes that don't - // have scheduler as good as linux's one add a whole lot of jitter. - const provideDelay = time.Millisecond * 5 - orig := &mockProvideMany{ - delay: provideDelay, - } - var provider Provide = orig - if singleProvide { - provider = singleMockWrapper{orig} - } - - const numProvides = 100 - keysToProvide := make([]cid.Cid, numProvides) - for i := range keysToProvide { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - c := cid.NewCidV1(cid.Raw, h) - keysToProvide[i] = c - } - - var keyWait sync.Mutex - keyWait.Lock() - batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - go func() { - defer keyWait.Unlock() - for _, k := range keysToProvide { - select { - case ch <- k: - case <-ctx.Done(): - return - } - } - }() - return ch, nil - }), - initialReprovideDelay(0), - ThroughputReport(func(_, complete bool, n uint, d time.Duration) bool { - if !singleProvide && complete { - t.Errorf("expected an incomplete report but got a complete one") - } - - const twentyFivePercent = provideDelay / 4 - const seventyFivePercent = provideDelay - twentyFivePercent - const hundredTwentyFivePercent = provideDelay + twentyFivePercent - - avg := d / time.Duration(n) - - // windows's and darwin's schedulers and timers are too unreliable for this check - if runtime.GOOS != "windows" && runtime.GOOS != "darwin" && !(seventyFivePercent <= avg && avg <= hundredTwentyFivePercent) { - t.Errorf("average computed duration is not within bounds, expected between %v and %v but got %v.", seventyFivePercent, hundredTwentyFivePercent, avg) - } - return false - }, numProvides/2), - ) - if err != nil { - t.Fatal(err) - } - defer batchSystem.Close() - - keyWait.Lock() - time.Sleep(pauseDetectionThreshold + time.Millisecond*50) // give it time to call provider after that - - keys, calls := orig.GetKeys() - if len(keys) != numProvides { - t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) - } - if singleProvide { - if calls != 100 { - t.Fatalf("expected 100 call single provide call, got %d", calls) - } - } else { - // Two because of ThroughputReport's limit being half. - if calls != 2 { - t.Fatalf("expected 2 call batched provide call, got %d", calls) - } - } - - provMap := make(map[string]struct{}) - for _, k := range keys { - provMap[string(k)] = struct{}{} - } - - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - if _, found := provMap[string(h)]; !found { - t.Fatalf("could not find provider with value %d", i) - } - } -} - -func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { - // Don't run in Parallel as this test is time sensitive. - - someHash, err := mh.Sum([]byte("Vires in Numeris!"), mh.BLAKE3, -1) - assert.NoError(t, err) - c := cid.NewCidV1(cid.Raw, someHash) - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - - // First public using an offline system to enqueue in the datastore. - sys, err := New(ds) - assert.NoError(t, err) - - err = sys.Provide(c) - assert.NoError(t, err) - - err = sys.Close() - assert.NoError(t, err) - - // Secondly restart an online datastore and we want to see this previously provided cid published. - prov := &mockProvideMany{} - sys, err = New(ds, Online(prov), initialReprovideDelay(0)) - assert.NoError(t, err) - - time.Sleep(pauseDetectionThreshold + time.Millisecond*10) // give it time to call provider after that - - err = sys.Close() - assert.NoError(t, err) - - prov.lk.Lock() - defer prov.lk.Unlock() - if len(prov.keys) != 1 { - t.Fatalf("expected to see 1 provide; got %d", len(prov.keys)) - } - if !bytes.Equal(prov.keys[0], someHash) { - t.Fatalf("keys are not equal expected %v, got %v", someHash, prov.keys[0]) - } -} diff --git a/provider/simple/provider.go b/provider/simple/provider.go new file mode 100644 index 000000000..63de031ad --- /dev/null +++ b/provider/simple/provider.go @@ -0,0 +1,116 @@ +// Package simple implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package simple + +import ( + "context" + "time" + + q "github.com/ipfs/boxo/provider/queue" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p/core/routing" +) + +var logP = logging.Logger("provider.simple") + +// Provider announces blocks to the network +type Provider struct { + ctx context.Context + // the CIDs for which provide announcements should be made + queue *q.Queue + // used to announce providing to the network + contentRouting routing.ContentRouting + // how long to wait for announce to complete before giving up + timeout time.Duration + // how many workers concurrently work through thhe queue + workerLimit int +} + +// Option defines the functional option type that can be used to configure +// provider instances +type Option func(*Provider) + +// WithTimeout is an option to set a timeout on a provider +func WithTimeout(timeout time.Duration) Option { + return func(p *Provider) { + p.timeout = timeout + } +} + +// MaxWorkers is an option to set the max workers on a provider +func MaxWorkers(count int) Option { + return func(p *Provider) { + p.workerLimit = count + } +} + +// NewProvider creates a provider that announces blocks to the network using a content router +func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { + p := &Provider{ + ctx: ctx, + queue: queue, + contentRouting: contentRouting, + workerLimit: 8, + } + + for _, option := range options { + option(p) + } + + return p +} + +// Close stops the provider +func (p *Provider) Close() error { + return p.queue.Close() +} + +// Run workers to handle provide requests. +func (p *Provider) Run() { + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) error { + return p.queue.Enqueue(root) +} + +// Handle all outgoing cids by providing (announcing) them +func (p *Provider) handleAnnouncements() { + for workers := 0; workers < p.workerLimit; workers++ { + go func() { + for p.ctx.Err() == nil { + select { + case <-p.ctx.Done(): + return + case c, ok := <-p.queue.Dequeue(): + if !ok { + // queue closed. + return + } + + p.doProvide(c) + } + } + }() + } +} + +func (p *Provider) doProvide(c cid.Cid) { + ctx := p.ctx + if p.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, p.timeout) + defer cancel() + } else { + ctx = p.ctx + } + + logP.Info("announce - start - ", c) + if err := p.contentRouting.Provide(ctx, c, true); err != nil { + logP.Warnf("Unable to provide entry: %s, %s", c, err) + } + logP.Info("announce - end - ", c) +} diff --git a/provider/simple/provider_test.go b/provider/simple/provider_test.go new file mode 100644 index 000000000..8734c8ff6 --- /dev/null +++ b/provider/simple/provider_test.go @@ -0,0 +1,166 @@ +package simple_test + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/ipfs/boxo/internal/test" + q "github.com/ipfs/boxo/provider/queue" + + . "github.com/ipfs/boxo/provider/simple" +) + +var blockGenerator = blocksutil.NewBlockGenerator() + +type mockRouting struct { + provided chan cid.Cid +} + +func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { + select { + case r.provided <- cid: + case <-ctx.Done(): + panic("context cancelled, but shouldn't have") + } + return nil +} + +func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan peer.AddrInfo { + return nil +} + +func mockContentRouting() *mockRouting { + r := mockRouting{} + r.provided = make(chan cid.Cid) + return &r +} + +func TestAnnouncement(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r) + prov.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = prov.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") + } + } + prov.Close() + + select { + case cp := <-r.provided: + t.Fatal("did not expect to provide CID: ", cp) + case <-time.After(time.Second * 1): + } +} + +func TestClose(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r) + prov.Run() + + prov.Close() + + select { + case cp := <-r.provided: + t.Fatal("did not expect to provide anything, provided: ", cp) + case <-time.After(time.Second * 1): + } +} + +func TestAnnouncementTimeout(t *testing.T) { + test.Flaky(t) + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r, WithTimeout(1*time.Second)) + prov.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = prov.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go new file mode 100644 index 000000000..a29b484fc --- /dev/null +++ b/provider/simple/reprovide.go @@ -0,0 +1,255 @@ +package simple + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/cenkalti/backoff" + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + "github.com/ipfs/boxo/verifcid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" + logging "github.com/ipfs/go-log" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/libp2p/go-libp2p/core/routing" +) + +var logR = logging.Logger("reprovider.simple") + +// ErrClosed is returned by Trigger when operating on a closed reprovider. +var ErrClosed = errors.New("reprovider service stopped") + +// KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) + +// Reprovider reannounces blocks to the network +type Reprovider struct { + // Reprovider context. Cancel to stop, then wait on closedCh. + ctx context.Context + cancel context.CancelFunc + closedCh chan struct{} + + // Trigger triggers a reprovide. + trigger chan chan<- error + + // The routing system to provide values through + rsys routing.ContentRouting + + keyProvider KeyChanFunc + + tick time.Duration +} + +// NewReprovider creates new Reprovider instance. +func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { + ctx, cancel := context.WithCancel(ctx) + return &Reprovider{ + ctx: ctx, + cancel: cancel, + closedCh: make(chan struct{}), + trigger: make(chan chan<- error), + + rsys: rsys, + keyProvider: keyProvider, + tick: reprovideInterval, + } +} + +// Close the reprovider +func (rp *Reprovider) Close() error { + rp.cancel() + <-rp.closedCh + return nil +} + +// Run re-provides keys with 'tick' interval or when triggered +func (rp *Reprovider) Run() { + defer close(rp.closedCh) + + var initialReprovideCh, reprovideCh <-chan time.Time + + // If reproviding is enabled (non-zero) + if rp.tick > 0 { + reprovideTicker := time.NewTicker(rp.tick) + defer reprovideTicker.Stop() + reprovideCh = reprovideTicker.C + + // If the reprovide ticker is larger than a minute (likely), + // provide once after we've been up a minute. + // + // Don't provide _immediately_ as we might be just about to stop. + if rp.tick > time.Minute { + initialReprovideTimer := time.NewTimer(time.Minute) + defer initialReprovideTimer.Stop() + + initialReprovideCh = initialReprovideTimer.C + } + } + + var done chan<- error + for rp.ctx.Err() == nil { + select { + case <-initialReprovideCh: + case <-reprovideCh: + case done = <-rp.trigger: + case <-rp.ctx.Done(): + return + } + + err := rp.Reprovide() + + // only log if we've hit an actual error, otherwise just tell the client we're shutting down + if rp.ctx.Err() != nil { + err = ErrClosed + } else if err != nil { + logR.Errorf("failed to reprovide: %s", err) + } + + if done != nil { + if err != nil { + done <- err + } + close(done) + } + } +} + +// Reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) Reprovide() error { + keychan, err := rp.keyProvider(rp.ctx) + if err != nil { + return fmt.Errorf("failed to get key chan: %s", err) + } + for c := range keychan { + // hash security + if err := verifcid.ValidateCid(c); err != nil { + logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) + continue + } + op := func() error { + err := rp.rsys.Provide(rp.ctx, c, true) + if err != nil { + logR.Debugf("Failed to provide key: %s", err) + } + return err + } + + err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx)) + if err != nil { + logR.Debugf("Providing failed after number of retries: %s", err) + return err + } + } + return nil +} + +// Trigger starts the reprovision process in rp.Run and waits for it to finish. +// +// Returns an error if a reprovide is already in progress. +func (rp *Reprovider) Trigger(ctx context.Context) error { + resultCh := make(chan error, 1) + select { + case rp.trigger <- resultCh: + default: + return fmt.Errorf("reprovider is already running") + } + + select { + case err := <-resultCh: + return err + case <-rp.ctx.Done(): + return ErrClosed + case <-ctx.Done(): + return ctx.Err() + } +} + +// Strategies + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// Pinner interface defines how the simple.Reprovider wants to interact +// with a Pinning service +type Pinner interface { + DirectKeys(ctx context.Context) ([]cid.Cid, error) + RecursiveKeys(ctx context.Context) ([]cid.Cid, error) +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) + + dkeys, err := pinning.DirectKeys(ctx) + if err != nil { + logR.Errorf("reprovide direct pins: %s", err) + return + } + for _, key := range dkeys { + set.Visitor(ctx)(key) + } + + rkeys, err := pinning.RecursiveKeys(ctx) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + + session := fetchConfig.NewSession(ctx) + for _, key := range rkeys { + set.Visitor(ctx)(key) + if !onlyRoots { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + clink, ok := res.LastBlockLink.(cidlink.Link) + if ok { + set.Visitor(ctx)(clink.Cid) + } + return nil + }) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil +} diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go new file mode 100644 index 000000000..8b521ae56 --- /dev/null +++ b/provider/simple/reprovide_test.go @@ -0,0 +1,289 @@ +package simple_test + +import ( + "bytes" + "context" + "testing" + "time" + + bsrv "github.com/ipfs/boxo/blockservice" + blockstore "github.com/ipfs/boxo/blockstore" + offline "github.com/ipfs/boxo/exchange/offline" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" + "github.com/ipfs/boxo/internal/test" + mock "github.com/ipfs/boxo/routing/mock" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/fluent/qp" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + testutil "github.com/libp2p/go-libp2p-testing/net" + "github.com/libp2p/go-libp2p/core/peer" + mh "github.com/multiformats/go-multihash" + + . "github.com/ipfs/boxo/provider/simple" +) + +func setupRouting(t *testing.T) (clA, clB mock.Client, idA, idB peer.ID) { + mrserv := mock.NewServer() + + iidA := testutil.RandIdentityOrFatal(t) + iidB := testutil.RandIdentityOrFatal(t) + + clA = mrserv.Client(iidA) + clB = mrserv.Client(iidB) + + return clA, clB, iidA.ID(), iidB.ID() +} + +func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { + bstore = blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + for _, data := range []string{"foo", "bar"} { + nb := basicnode.Prototype.Any.NewBuilder() + err := nb.AssignString(data) + if err != nil { + t.Fatal(err) + } + blk := toBlock(t, nb.Build()) + err = bstore.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + nodes = append(nodes, blk.Cid()) + nd, err := qp.BuildMap(basicnode.Prototype.Map, 1, func(ma ipld.MapAssembler) { + qp.MapEntry(ma, "child", qp.Link(cidlink.Link{Cid: blk.Cid()})) + }) + if err != nil { + t.Fatal(err) + } + blk = toBlock(t, nd) + err = bstore.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + nodes = append(nodes, blk.Cid()) + } + + return nodes, bstore +} + +func toBlock(t *testing.T, nd ipld.Node) blocks.Block { + buf := new(bytes.Buffer) + err := dagcbor.Encode(nd, buf) + if err != nil { + t.Fatal(err) + } + c, err := cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: mh.SHA2_256, + MhLength: -1, + }.Sum(buf.Bytes()) + if err != nil { + t.Fatal(err) + } + blk, err := blocks.NewBlockWithCid(buf.Bytes(), c) + if err != nil { + t.Fatal(err) + } + return blk +} + +func TestReprovide(t *testing.T) { + test.Flaky(t) + testReprovide(t, func(r *Reprovider, ctx context.Context) error { + return r.Reprovide() + }) +} + +func TestTrigger(t *testing.T) { + test.Flaky(t) + testReprovide(t, func(r *Reprovider, ctx context.Context) error { + go r.Run() + time.Sleep(1 * time.Second) + defer r.Close() + err := r.Trigger(ctx) + return err + }) +} + +func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clA, clB, idA, _ := setupRouting(t) + nodes, bstore := setupDag(t) + + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) + reprov.Trigger(context.Background()) + err := trigger(reprov, ctx) + if err != nil { + t.Fatal(err) + } + + var providers []peer.AddrInfo + maxProvs := 100 + + for _, c := range nodes { + // We provide raw cids because of the multihash keying + // FIXME(@Jorropo): I think this change should be done in the DHT layer, probably an issue with our routing mock. + b := c.Bytes() + b[1] = 0x55 // rewrite the cid to raw + _, c, err := cid.CidFromBytes(b) + if err != nil { + t.Fatal(err) + } + provChan := clB.FindProvidersAsync(ctx, c, maxProvs) + for p := range provChan { + providers = append(providers, p) + } + + if len(providers) == 0 { + t.Fatal("Should have gotten a provider") + } + + if providers[0].ID != idA { + t.Fatal("Somehow got the wrong peer back as a provider.") + } + } +} + +func TestTriggerTwice(t *testing.T) { + test.Flaky(t) + // Ensure we can only trigger once at a time. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clA, _, _, _ := setupRouting(t) + + keyCh := make(chan cid.Cid) + startCh := make(chan struct{}) + keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) { + <-startCh + return keyCh, nil + } + + reprov := NewReprovider(ctx, time.Hour, clA, keyFunc) + go reprov.Run() + defer reprov.Close() + + // Wait for the reprovider to start, otherwise, the reprovider will + // think a concurrent reprovide is running. + // + // We _could_ fix this race... but that would be complexity for nothing. + // 1. We start a reprovide 1 minute after startup anyways. + // 2. The window is really narrow. + time.Sleep(1 * time.Second) + + errCh := make(chan error, 2) + + // Trigger in the background + go func() { + errCh <- reprov.Trigger(ctx) + }() + + // Wait for the trigger to really start. + startCh <- struct{}{} + + start := time.Now() + // Try to trigger again, this should fail immediately. + if err := reprov.Trigger(ctx); err == nil { + t.Fatal("expected an error") + } + if time.Since(start) > 10*time.Millisecond { + t.Fatal("expected reprovide to fail instantly") + } + + // Let the trigger progress. + close(keyCh) + + // Check the result. + err := <-errCh + if err != nil { + t.Fatal(err) + } + + // Try to trigger again, this should work. + go func() { + errCh <- reprov.Trigger(ctx) + }() + startCh <- struct{}{} + err = <-errCh + if err != nil { + t.Fatal(err) + } +} + +type mockPinner struct { + recursive []cid.Cid + direct []cid.Cid +} + +func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { + return mp.direct, nil +} + +func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { + return mp.recursive, nil +} + +func TestReprovidePinned(t *testing.T) { + test.Flaky(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nodes, bstore := setupDag(t) + + fetchConfig := bsfetcher.NewFetcherConfig(bsrv.New(bstore, offline.Exchange(bstore))) + + for i := 0; i < 2; i++ { + clA, clB, idA, _ := setupRouting(t) + + onlyRoots := i == 0 + t.Logf("only roots: %v", onlyRoots) + + var provide, dont []cid.Cid + if onlyRoots { + provide = []cid.Cid{nodes[1], nodes[3]} + dont = []cid.Cid{nodes[0], nodes[2]} + } else { + provide = []cid.Cid{nodes[0], nodes[1], nodes[3]} + dont = []cid.Cid{nodes[2]} + } + + keyProvider := NewPinnedProvider(onlyRoots, &mockPinner{ + recursive: []cid.Cid{nodes[1]}, + direct: []cid.Cid{nodes[3]}, + }, fetchConfig) + + reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) + err := reprov.Reprovide() + if err != nil { + t.Fatal(err) + } + + for i, c := range provide { + prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) + if !ok { + t.Errorf("Should have gotten a provider for %d", i) + continue + } + + if prov.ID != idA { + t.Errorf("Somehow got the wrong peer back as a provider.") + continue + } + } + for i, c := range dont { + prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) + if ok { + t.Fatalf("found provider %s for %d, expected none", prov.ID, i) + } + } + } +} diff --git a/provider/system.go b/provider/system.go new file mode 100644 index 000000000..9fc3e8879 --- /dev/null +++ b/provider/system.go @@ -0,0 +1,60 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +// System defines the interface for interacting with the value +// provider system +type System interface { + Run() + Close() error + Provide(cid.Cid) error + Reprovide(context.Context) error +} + +type system struct { + provider Provider + reprovider Reprovider +} + +// NewSystem constructs a new provider system from a provider and reprovider +func NewSystem(provider Provider, reprovider Reprovider) System { + return &system{provider, reprovider} +} + +// Run the provider system by running the provider and reprovider +func (s *system) Run() { + go s.provider.Run() + go s.reprovider.Run() +} + +// Close the provider and reprovider +func (s *system) Close() error { + var errs []error + + if err := s.provider.Close(); err != nil { + errs = append(errs, err) + } + + if err := s.reprovider.Close(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errs[0] + } + return nil +} + +// Provide a value +func (s *system) Provide(cid cid.Cid) error { + return s.provider.Provide(cid) +} + +// Reprovide all the previously provided values +func (s *system) Reprovide(ctx context.Context) error { + return s.reprovider.Trigger(ctx) +}