From c707e9903e620baf8098a38252b4297cf3b7b14a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 9 Apr 2021 11:24:03 -0400 Subject: [PATCH 01/29] fix: in queue sync datastore to disk --- queue/queue.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/queue/queue.go b/queue/queue.go index 2c33502..21dcdd3 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -102,6 +102,12 @@ func (q *Queue) work() { log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) return } + + if err := q.ds.Sync(k); err != nil { + log.Errorf("error syncing deletion of queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) + continue + } + continue } } else { @@ -124,6 +130,12 @@ func (q *Queue) work() { log.Errorf("Failed to enqueue cid: %s", err) continue } + + if err := q.ds.Sync(k); err != nil { + log.Errorf("Failed to sync enqueuing cid: %s", err) + continue + } + case dequeue <- c: err := q.ds.Delete(k) @@ -131,6 +143,12 @@ func (q *Queue) work() { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } + + if err := q.ds.Sync(k); err != nil { + log.Errorf("Failed to sync deleted queued cid %s with key %s: %s", c, k, err) + continue + } + c = cid.Undef case <-q.ctx.Done(): return From c193919f18c18d284691853b3b427c67d932f07b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 9 Apr 2021 11:24:23 -0400 Subject: [PATCH 02/29] feat: add a batching provider/reprovider system --- batched/system.go | 472 ++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 7 +- go.sum | 49 +++-- 3 files changed, 514 insertions(+), 14 deletions(-) create mode 100644 batched/system.go diff --git a/batched/system.go b/batched/system.go new file mode 100644 index 0000000..b6d35f7 --- /dev/null +++ b/batched/system.go @@ -0,0 +1,472 @@ +package batched + +import ( + "context" + "fmt" + "github.com/ipfs/go-ipfs-provider/queue" + "strconv" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" + dshelp "github.com/ipfs/go-ipfs-ds-help" + provider "github.com/ipfs/go-ipfs-provider" + "github.com/ipfs/go-ipfs-provider/simple" + logging "github.com/ipfs/go-log" + "github.com/ipfs/go-verifcid" + "github.com/multiformats/go-multihash" +) + +var log = logging.Logger("provider.batched") + +type BatchProvidingSystem struct { + ctx context.Context + close context.CancelFunc + + reprovideInterval time.Duration + rsys provideMany + keyProvider simple.KeyChanFunc + + q *queue.Queue + ds, managedDS, timeDS datastore.Batching + + provch, managedCh, dynamicCh chan cid.Cid + + totalProvides int + avgProvideTime time.Duration +} + +var _ provider.System = (*BatchProvidingSystem)(nil) + +type provideMany interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error +} + +// Option defines the functional option type that can be used to configure +// BatchProvidingSystem instances +type Option func(system *BatchProvidingSystem) error + +var managedKey = datastore.NewKey("/provider/reprovide/managed") +var timeKey = datastore.NewKey("/provider/reprovide/time") + +func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { + ctx, cancel := context.WithCancel(context.Background()) + s := &BatchProvidingSystem{ + ctx: ctx, + close: cancel, + + reprovideInterval: time.Hour * 24, + rsys: provider, + keyProvider: nil, + q: q, + ds: datastore.NewMapDatastore(), + provch: make(chan cid.Cid, 1), + managedCh: make(chan cid.Cid, 1), + dynamicCh: make(chan cid.Cid, 1), + } + + for _, o := range opts { + if err := o(s); err != nil { + return nil, err + } + } + + s.managedDS = namespace.Wrap(s.ds, managedKey) + s.timeDS = namespace.Wrap(s.ds, timeKey) + + return s, nil +} + +func Datastore(batching datastore.Batching) Option { + return func(system *BatchProvidingSystem) error { + system.ds = batching + return nil + } +} + +func ReproviderInterval(duration time.Duration) Option { + return func(system *BatchProvidingSystem) error { + system.reprovideInterval = duration + return nil + } +} + +func KeyProvider(fn simple.KeyChanFunc) Option { + return func(system *BatchProvidingSystem) error { + system.keyProvider = fn + return nil + } +} + +func (s *BatchProvidingSystem) Run() { + go func() { + m := make(map[cid.Cid]struct{}) + for { + pauseDetectTimer := time.NewTimer(time.Hour) + maxDurationCollectionTimer := time.NewTimer(time.Minute * 10) + loop: + for { + select { + case c := <-s.provch: + m[c] = struct{}{} + pauseDetectTimer.Reset(time.Millisecond * 500) + continue + default: + } + + select { + case c := <-s.provch: + m[c] = struct{}{} + pauseDetectTimer.Reset(time.Millisecond * 500) + continue + case c := <-s.managedCh: + m[c] = struct{}{} + pauseDetectTimer.Reset(time.Millisecond * 500) + continue + default: + } + + select { + case c := <-s.provch: + m[c] = struct{}{} + pauseDetectTimer.Reset(time.Millisecond * 500) + case c := <-s.managedCh: + m[c] = struct{}{} + pauseDetectTimer.Reset(time.Millisecond * 500) + case c := <-s.dynamicCh: + m[c] = struct{}{} + pauseDetectTimer.Reset(time.Millisecond * 500) + case <-pauseDetectTimer.C: + break loop + case <-maxDurationCollectionTimer.C: + break loop + case <-s.ctx.Done(): + return + } + } + keys := make([]multihash.Multihash, 0, len(m)) + for c := range m { + // hash security + if err := verifcid.ValidateCid(c); err != nil { + log.Errorf("insecure hash in reprovider, %s (%s)", c, err) + continue + } + + keys = append(keys, c.Hash()) + } + + start := time.Now() + err := s.rsys.ProvideMany(s.ctx, keys) + if err != nil { + log.Debugf("providing failed %v", err) + continue + } + dur := time.Since(start) + + totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideTime) + s.avgProvideTime = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) + s.totalProvides += len(keys) + + for c := range m { + s.timeDS.Put(dshelp.CidToDsKey(c), storeTime(time.Now())) + delete(m, c) + } + s.timeDS.Sync(datastore.NewKey("")) + } + }() + + go func() { + ch := s.q.Dequeue() + for { + select { + case c := <-ch: + s.provch <- c + case <-s.ctx.Done(): + return + } + } + }() + + go func() { + var initialReprovideCh, reprovideCh <-chan time.Time + + // If reproviding is enabled (non-zero) + if s.reprovideInterval > 0 { + reprovideTicker := time.NewTicker(s.reprovideInterval) + defer reprovideTicker.Stop() + reprovideCh = reprovideTicker.C + + // If the reprovide ticker is larger than a minute (likely), + // provide once after we've been up a minute. + // + // Don't provide _immediately_ as we might be just about to stop. + if s.reprovideInterval > time.Minute { + initialReprovideTimer := time.NewTimer(time.Minute) + defer initialReprovideTimer.Stop() + + initialReprovideCh = initialReprovideTimer.C + } + } + + for s.ctx.Err() == nil { + select { + case <-initialReprovideCh: + case <-reprovideCh: + case <-s.ctx.Done(): + return + } + + err := s.reprovide(s.ctx, false) + + // only log if we've hit an actual error, otherwise just tell the client we're shutting down + if s.ctx.Err() == nil && err != nil { + log.Errorf("failed to reprovide: %s", err) + } + } + }() +} + +func storeTime(t time.Time) []byte { + val := []byte(fmt.Sprintf("%d", t.UnixNano())) + return val +} + +func getTime(b []byte) (time.Time, error) { + tns, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return time.Time{}, err + } + return time.Unix(0, tns), nil +} + +func (s *BatchProvidingSystem) Close() error { + s.close() + return s.q.Close() +} + +func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { + return s.q.Enqueue(cid) +} + +func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { + return s.reprovide(ctx, true) +} + +func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { + qres, err := s.managedDS.Query(query.Query{}) + if err != nil { + return err + } + + nextCh := qres.Next() +managedCidLoop: + for { + select { + case r, ok := <-nextCh: + if !ok { + break managedCidLoop + } + c, err := dshelp.DsKeyToCid(datastore.NewKey(r.Key)) + if err != nil { + log.Debugf("could not decode key %v as CID", r.Key) + continue + } + + if !s.shouldReprovide(c) && !force { + continue + } + + select { + case s.managedCh <- c: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + + kch, err := s.keyProvider(ctx) + if err != nil { + return err + } + +dynamicCidLoop: + for { + select { + case c, ok := <-kch: + if !ok { + break dynamicCidLoop + } + if !s.shouldReprovide(c) && !force { + continue + } + + select { + case s.dynamicCh <- c: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +func (s *BatchProvidingSystem) getLastReprovideTime(c cid.Cid) (time.Time, error) { + k := dshelp.CidToDsKey(c) + val, err := s.timeDS.Get(k) + if err != nil { + return time.Time{}, fmt.Errorf("could not get time for %v", k) + } + + t, err := getTime(val) + if err != nil { + return time.Time{}, fmt.Errorf("could not decode time for %v, got %q", k, string(val)) + } + + return t, nil +} + +func (s *BatchProvidingSystem) shouldReprovide(c cid.Cid) bool { + t, err := s.getLastReprovideTime(c) + if err != nil { + log.Debugf(err.Error()) + return false + } + + if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) { + return false + } + return true +} + +// Stat returns the total number of provides we are responsible for, +// the number that have been recently provided, the total number of provides we have done +// since starting the system and the average time per provide +func (s *BatchProvidingSystem) Stat(ctx context.Context) (int, int, int, time.Duration, error) { + // TODO: Overlap between managed + dynamic lists + total := 0 + recentlyProvided := 0 + + qres, err := s.managedDS.Query(query.Query{}) + if err != nil { + return 0, 0, 0, 0, err + } + + nextCh := qres.Next() +managedCidLoop: + for { + select { + case r, ok := <-nextCh: + if !ok { + break managedCidLoop + } + total++ + c, err := dshelp.DsKeyToCid(datastore.NewKey(r.Key)) + if err != nil { + log.Debugf("could not decode key %v as CID", r.Key) + continue + } + + t, err := s.getLastReprovideTime(c) + if err != nil { + log.Debugf(err.Error()) + continue + } + + if time.Since(t) < s.reprovideInterval { + recentlyProvided++ + } + case <-ctx.Done(): + return 0, 0, 0, 0, ctx.Err() + } + } + + kch, err := s.keyProvider(ctx) + if err != nil { + return 0, 0, 0, 0, err + } + +dynamicCidLoop: + for { + select { + case c, ok := <-kch: + if !ok { + break dynamicCidLoop + } + total++ + t, err := s.getLastReprovideTime(c) + if err != nil { + log.Debugf(err.Error()) + continue + } + + if time.Since(t) < s.reprovideInterval { + recentlyProvided++ + } + case <-ctx.Done(): + return 0, 0, 0, 0, ctx.Err() + } + } + + // TODO: Does it matter that there is no locking around the total+average values? + return total, recentlyProvided, s.totalProvides, s.avgProvideTime, nil +} + +func (s *BatchProvidingSystem) ProvideLongterm(cids ...cid.Cid) error { + for _, c := range cids { + k := dshelp.CidToDsKey(c) + if err := s.managedDS.Put(k, []byte{}); err != nil { + return err + } + } + if err := s.managedDS.Sync(datastore.NewKey("")); err != nil { + return err + } + return nil +} + +func (s *BatchProvidingSystem) RemoveLongterm(cids ...cid.Cid) error { + for _, c := range cids { + k := dshelp.CidToDsKey(c) + if err := s.managedDS.Delete(k); err != nil { + return err + } + } + if err := s.managedDS.Sync(datastore.NewKey("")); err != nil { + return err + } + return nil +} + +func (s *BatchProvidingSystem) GetLongtermProvides(ctx context.Context) (<-chan cid.Cid, error) { + qres, err := s.managedDS.Query(query.Query{ + KeysOnly: true, + }) + if err != nil { + return nil, err + } + + ch := make(chan cid.Cid, 1) + + go func() { + for r := range qres.Next() { + c, err := dshelp.DsKeyToCid(datastore.NewKey(r.Key)) + if err != nil { + log.Debugf("could not decode key %v as CID", r.Key) + } + select { + case ch <- c: + case <-ctx.Done(): + return + } + } + }() + + return ch, nil +} diff --git a/go.mod b/go.mod index 4440fa2..351074c 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,12 @@ retract [v1.0.0, v1.0.1] require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/ipfs/go-blockservice v0.1.2 - github.com/ipfs/go-cid v0.0.3 + github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-cidutil v0.0.2 - github.com/ipfs/go-datastore v0.1.0 + github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ipfs-blockstore v0.1.0 github.com/ipfs/go-ipfs-blocksutil v0.0.1 + github.com/ipfs/go-ipfs-ds-help v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-cbor v0.0.3 @@ -21,5 +22,5 @@ require ( github.com/ipfs/go-verifcid v0.0.1 github.com/libp2p/go-libp2p-core v0.2.2 github.com/libp2p/go-libp2p-testing v0.1.0 - github.com/multiformats/go-multihash v0.0.8 + github.com/multiformats/go-multihash v0.0.13 ) diff --git a/go.sum b/go.sum index bc0646b..828c48d 100644 --- a/go.sum +++ b/go.sum @@ -30,7 +30,6 @@ github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzA github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -41,6 +40,7 @@ github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= @@ -69,14 +69,15 @@ github.com/ipfs/go-blockservice v0.1.2 h1:fqFeeu1EG0lGVrqUo+BVJv7LZV31I4ZsyNthCO github.com/ipfs/go-blockservice v0.1.2/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= -github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms= -github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY= +github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= github.com/ipfs/go-cidutil v0.0.2 h1:CNOboQf1t7Qp0nuNh8QMmhJs0+Q//bRL1axtCnIB1Yo= github.com/ipfs/go-cidutil v0.0.2/go.mod h1:ewllrvrxG6AMYStla3GD7Cqn+XYSLqjK0vc+086tB6s= github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.0.5/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= -github.com/ipfs/go-datastore v0.1.0 h1:TOxI04l8CmO4zGtesENhzm4PwkFwJXY3rKiYaaMf9fI= github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= +github.com/ipfs/go-datastore v0.4.5 h1:cwOUcGMLdLPWgu3SlrCckCMznaGADbPqE0r8h768/Dg= +github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w3fyfrmmJs= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= @@ -125,8 +126,9 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4 github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= -github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= @@ -139,8 +141,9 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/koron/go-ssdp v0.0.0-20180514024734-4a0ed625a78b h1:wxtKgYHEncAU00muMD06dzLiahtGM1eouRNOzVV7tdQ= github.com/koron/go-ssdp v0.0.0-20180514024734-4a0ed625a78b/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -227,10 +230,13 @@ github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 h1:MHkK1uRtFbV github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= -github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc= +github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= +github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= +github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4= @@ -241,14 +247,17 @@ github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/94 github.com/multiformats/go-multiaddr-fmt v0.0.1/go.mod h1:aBYjqL4T/7j4Qx+R73XSv/8JsgnRFlf0w2KGLCmXl3Q= github.com/multiformats/go-multiaddr-net v0.0.1 h1:76O59E3FavvHqNg7jvzWzsPSW5JSi/ek0E4eiDVbg9g= github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d1tnvM7ihcFwHMSstNAVUU= -github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= +github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= +github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= -github.com/multiformats/go-multihash v0.0.8 h1:wrYcW5yxSi3dU07n5jnuS5PrNwyHy0zRHGVoUugWvXg= -github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= +github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= +github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ= github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= +github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg= +github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -262,6 +271,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992 h1:bzMe+2coZJYHnhGgVlcQKuRy4FSny4ds8dLQjw5P1XE= github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -293,12 +303,16 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -307,6 +321,8 @@ golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8U golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -316,12 +332,14 @@ golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/net v0.0.0-20190611141213-3f473d35a33a h1:+KkCgOMgnKSgenxTBoiwkMqTiouMIy/3o8RLdmSbGoY= golang.org/x/net v0.0.0-20190611141213-3f473d35a33a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -342,6 +360,11 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -350,6 +373,9 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= @@ -357,3 +383,4 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= From 829bc0993a43f04802ecd3ed0b78c6c399230d3f Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 22 Apr 2021 09:50:54 -0400 Subject: [PATCH 03/29] simplify batch provider to remove the managed provider list, remove tracking of individual CID reprovide times, and have a very simple and fast stats function --- batched/system.go | 240 ++++++++-------------------------------------- 1 file changed, 42 insertions(+), 198 deletions(-) diff --git a/batched/system.go b/batched/system.go index b6d35f7..2b3c37c 100644 --- a/batched/system.go +++ b/batched/system.go @@ -9,9 +9,6 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" - dshelp "github.com/ipfs/go-ipfs-ds-help" provider "github.com/ipfs/go-ipfs-provider" "github.com/ipfs/go-ipfs-provider/simple" logging "github.com/ipfs/go-log" @@ -29,13 +26,13 @@ type BatchProvidingSystem struct { rsys provideMany keyProvider simple.KeyChanFunc - q *queue.Queue - ds, managedDS, timeDS datastore.Batching + q *queue.Queue + ds datastore.Batching - provch, managedCh, dynamicCh chan cid.Cid + provch, dynamicCh chan cid.Cid - totalProvides int - avgProvideTime time.Duration + totalProvides, lastReprovideBatchSize int + avgProvideDuration, lastReprovideDuration time.Duration } var _ provider.System = (*BatchProvidingSystem)(nil) @@ -48,8 +45,7 @@ type provideMany interface { // BatchProvidingSystem instances type Option func(system *BatchProvidingSystem) error -var managedKey = datastore.NewKey("/provider/reprovide/managed") -var timeKey = datastore.NewKey("/provider/reprovide/time") +var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { ctx, cancel := context.WithCancel(context.Background()) @@ -63,7 +59,6 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS q: q, ds: datastore.NewMapDatastore(), provch: make(chan cid.Cid, 1), - managedCh: make(chan cid.Cid, 1), dynamicCh: make(chan cid.Cid, 1), } @@ -73,9 +68,6 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS } } - s.managedDS = namespace.Wrap(s.ds, managedKey) - s.timeDS = namespace.Wrap(s.ds, timeKey) - return s, nil } @@ -106,6 +98,8 @@ func (s *BatchProvidingSystem) Run() { for { pauseDetectTimer := time.NewTimer(time.Hour) maxDurationCollectionTimer := time.NewTimer(time.Minute * 10) + + performedReprovide := false loop: for { select { @@ -120,24 +114,10 @@ func (s *BatchProvidingSystem) Run() { case c := <-s.provch: m[c] = struct{}{} pauseDetectTimer.Reset(time.Millisecond * 500) - continue - case c := <-s.managedCh: - m[c] = struct{}{} - pauseDetectTimer.Reset(time.Millisecond * 500) - continue - default: - } - - select { - case c := <-s.provch: - m[c] = struct{}{} - pauseDetectTimer.Reset(time.Millisecond * 500) - case c := <-s.managedCh: - m[c] = struct{}{} - pauseDetectTimer.Reset(time.Millisecond * 500) case c := <-s.dynamicCh: m[c] = struct{}{} pauseDetectTimer.Reset(time.Millisecond * 500) + performedReprovide = true case <-pauseDetectTimer.C: break loop case <-maxDurationCollectionTimer.C: @@ -155,6 +135,7 @@ func (s *BatchProvidingSystem) Run() { } keys = append(keys, c.Hash()) + delete(m, c) } start := time.Now() @@ -165,15 +146,21 @@ func (s *BatchProvidingSystem) Run() { } dur := time.Since(start) - totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideTime) - s.avgProvideTime = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) + totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) + s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) s.totalProvides += len(keys) - for c := range m { - s.timeDS.Put(dshelp.CidToDsKey(c), storeTime(time.Now())) - delete(m, c) + if performedReprovide { + s.lastReprovideBatchSize = len(keys) + s.lastReprovideDuration = dur + + if err := s.ds.Put(lastReprovideKey, storeTime(time.Now())); err != nil { + log.Errorf("could not store last reprovide time: %v", err) + } + if err := s.ds.Sync(lastReprovideKey); err != nil { + log.Errorf("could not perform sync of last reprovide time: %v", err) + } } - s.timeDS.Sync(datastore.NewKey("")) } }() @@ -255,37 +242,8 @@ func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { } func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { - qres, err := s.managedDS.Query(query.Query{}) - if err != nil { - return err - } - - nextCh := qres.Next() -managedCidLoop: - for { - select { - case r, ok := <-nextCh: - if !ok { - break managedCidLoop - } - c, err := dshelp.DsKeyToCid(datastore.NewKey(r.Key)) - if err != nil { - log.Debugf("could not decode key %v as CID", r.Key) - continue - } - - if !s.shouldReprovide(c) && !force { - continue - } - - select { - case s.managedCh <- c: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } + if !s.shouldReprovide() && !force { + return nil } kch, err := s.keyProvider(ctx) @@ -300,9 +258,6 @@ dynamicCidLoop: if !ok { break dynamicCidLoop } - if !s.shouldReprovide(c) && !force { - continue - } select { case s.dynamicCh <- c: @@ -317,23 +272,22 @@ dynamicCidLoop: return nil } -func (s *BatchProvidingSystem) getLastReprovideTime(c cid.Cid) (time.Time, error) { - k := dshelp.CidToDsKey(c) - val, err := s.timeDS.Get(k) +func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { + val, err := s.ds.Get(lastReprovideKey) if err != nil { - return time.Time{}, fmt.Errorf("could not get time for %v", k) + return time.Time{}, fmt.Errorf("could not get last reprovide time") } t, err := getTime(val) if err != nil { - return time.Time{}, fmt.Errorf("could not decode time for %v, got %q", k, string(val)) + return time.Time{}, fmt.Errorf("could not decode last reprovide time, got %q", string(val)) } return t, nil } -func (s *BatchProvidingSystem) shouldReprovide(c cid.Cid) bool { - t, err := s.getLastReprovideTime(c) +func (s *BatchProvidingSystem) shouldReprovide() bool { + t, err := s.getLastReprovideTime() if err != nil { log.Debugf(err.Error()) return false @@ -345,128 +299,18 @@ func (s *BatchProvidingSystem) shouldReprovide(c cid.Cid) bool { return true } -// Stat returns the total number of provides we are responsible for, -// the number that have been recently provided, the total number of provides we have done -// since starting the system and the average time per provide -func (s *BatchProvidingSystem) Stat(ctx context.Context) (int, int, int, time.Duration, error) { - // TODO: Overlap between managed + dynamic lists - total := 0 - recentlyProvided := 0 - - qres, err := s.managedDS.Query(query.Query{}) - if err != nil { - return 0, 0, 0, 0, err - } - - nextCh := qres.Next() -managedCidLoop: - for { - select { - case r, ok := <-nextCh: - if !ok { - break managedCidLoop - } - total++ - c, err := dshelp.DsKeyToCid(datastore.NewKey(r.Key)) - if err != nil { - log.Debugf("could not decode key %v as CID", r.Key) - continue - } - - t, err := s.getLastReprovideTime(c) - if err != nil { - log.Debugf(err.Error()) - continue - } - - if time.Since(t) < s.reprovideInterval { - recentlyProvided++ - } - case <-ctx.Done(): - return 0, 0, 0, 0, ctx.Err() - } - } - - kch, err := s.keyProvider(ctx) - if err != nil { - return 0, 0, 0, 0, err - } - -dynamicCidLoop: - for { - select { - case c, ok := <-kch: - if !ok { - break dynamicCidLoop - } - total++ - t, err := s.getLastReprovideTime(c) - if err != nil { - log.Debugf(err.Error()) - continue - } - - if time.Since(t) < s.reprovideInterval { - recentlyProvided++ - } - case <-ctx.Done(): - return 0, 0, 0, 0, ctx.Err() - } - } - - // TODO: Does it matter that there is no locking around the total+average values? - return total, recentlyProvided, s.totalProvides, s.avgProvideTime, nil +type BatchedProviderStats struct { + TotalProvides, LastReprovideBatchSize int + AvgProvideDuration, LastReprovideDuration time.Duration } -func (s *BatchProvidingSystem) ProvideLongterm(cids ...cid.Cid) error { - for _, c := range cids { - k := dshelp.CidToDsKey(c) - if err := s.managedDS.Put(k, []byte{}); err != nil { - return err - } - } - if err := s.managedDS.Sync(datastore.NewKey("")); err != nil { - return err - } - return nil -} - -func (s *BatchProvidingSystem) RemoveLongterm(cids ...cid.Cid) error { - for _, c := range cids { - k := dshelp.CidToDsKey(c) - if err := s.managedDS.Delete(k); err != nil { - return err - } - } - if err := s.managedDS.Sync(datastore.NewKey("")); err != nil { - return err - } - return nil -} - -func (s *BatchProvidingSystem) GetLongtermProvides(ctx context.Context) (<-chan cid.Cid, error) { - qres, err := s.managedDS.Query(query.Query{ - KeysOnly: true, - }) - if err != nil { - return nil, err - } - - ch := make(chan cid.Cid, 1) - - go func() { - for r := range qres.Next() { - c, err := dshelp.DsKeyToCid(datastore.NewKey(r.Key)) - if err != nil { - log.Debugf("could not decode key %v as CID", r.Key) - } - select { - case ch <- c: - case <-ctx.Done(): - return - } - } - }() - - return ch, nil +// Stat returns various stats about this provider system +func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) { + // TODO: Does it matter that there is no locking around the total+average values? + return BatchedProviderStats{ + TotalProvides: s.totalProvides, + LastReprovideBatchSize: s.lastReprovideBatchSize, + AvgProvideDuration: s.avgProvideDuration, + LastReprovideDuration: s.lastReprovideDuration, + }, nil } From 0fdf1d6eb09d5c800360cbeb52ad29d132b473f0 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 22 Apr 2021 15:03:50 -0400 Subject: [PATCH 04/29] batch provider waits for provider system to be ready before attempting to provide --- batched/system.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/batched/system.go b/batched/system.go index 2b3c37c..2064962 100644 --- a/batched/system.go +++ b/batched/system.go @@ -39,6 +39,7 @@ var _ provider.System = (*BatchProvidingSystem)(nil) type provideMany interface { ProvideMany(ctx context.Context, keys []multihash.Multihash) error + Ready() bool } // Option defines the functional option type that can be used to configure @@ -138,6 +139,15 @@ func (s *BatchProvidingSystem) Run() { delete(m, c) } + for !s.rsys.Ready() { + log.Debugf("reprovider system not ready") + select { + case <-time.After(time.Minute): + case <-s.ctx.Done(): + return + } + } + start := time.Now() err := s.rsys.ProvideMany(s.ctx, keys) if err != nil { From 93ff8661d06e62f00ace066d72ecef73355062a9 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 23 Apr 2021 02:04:57 -0400 Subject: [PATCH 05/29] fix: handle periods with no providing, and handle the case of a missing LastReprovide entry in the datastore --- batched/system.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/batched/system.go b/batched/system.go index 2064962..4b12ebf 100644 --- a/batched/system.go +++ b/batched/system.go @@ -2,6 +2,7 @@ package batched import ( "context" + "errors" "fmt" "github.com/ipfs/go-ipfs-provider/queue" "strconv" @@ -127,6 +128,11 @@ func (s *BatchProvidingSystem) Run() { return } } + + if len(m) == 0 { + continue + } + keys := make([]multihash.Multihash, 0, len(m)) for c := range m { // hash security @@ -284,6 +290,9 @@ dynamicCidLoop: func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { val, err := s.ds.Get(lastReprovideKey) + if errors.Is(err, datastore.ErrNotFound) { + return time.Time{}, nil + } if err != nil { return time.Time{}, fmt.Errorf("could not get last reprovide time") } From fb01b266d920117a83c30407b65cce06e5cd6752 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 13:11:07 -0400 Subject: [PATCH 06/29] moved import --- batched/system.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batched/system.go b/batched/system.go index 4b12ebf..488507b 100644 --- a/batched/system.go +++ b/batched/system.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/ipfs/go-ipfs-provider/queue" "strconv" "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" provider "github.com/ipfs/go-ipfs-provider" + "github.com/ipfs/go-ipfs-provider/queue" "github.com/ipfs/go-ipfs-provider/simple" logging "github.com/ipfs/go-log" "github.com/ipfs/go-verifcid" From bf3cf566a2f83758c4f0ccb742d4e5ae1d498986 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 13:15:25 -0400 Subject: [PATCH 07/29] batched: moved context creation to prevent leakage --- batched/system.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/batched/system.go b/batched/system.go index 488507b..e2e0797 100644 --- a/batched/system.go +++ b/batched/system.go @@ -50,11 +50,7 @@ type Option func(system *BatchProvidingSystem) error var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { - ctx, cancel := context.WithCancel(context.Background()) s := &BatchProvidingSystem{ - ctx: ctx, - close: cancel, - reprovideInterval: time.Hour * 24, rsys: provider, keyProvider: nil, @@ -70,6 +66,12 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS } } + // This is after the options processing so we do not have to worry about leaking a context if there is an + // initialization error processing the options + ctx, cancel := context.WithCancel(context.Background()) + s.ctx = ctx + s.close = cancel + return s, nil } From 00230408d09dcdc2dfaceee5ea59cea06273b8e2 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 13:50:46 -0400 Subject: [PATCH 08/29] cleanup code a bit and add more logging --- batched/system.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/batched/system.go b/batched/system.go index e2e0797..068394f 100644 --- a/batched/system.go +++ b/batched/system.go @@ -97,6 +97,8 @@ func KeyProvider(fn simple.KeyChanFunc) Option { } func (s *BatchProvidingSystem) Run() { + const pauseDetectionThreshold = time.Millisecond * 500 + go func() { m := make(map[cid.Cid]struct{}) for { @@ -109,7 +111,7 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-s.provch: m[c] = struct{}{} - pauseDetectTimer.Reset(time.Millisecond * 500) + pauseDetectTimer.Reset(pauseDetectionThreshold) continue default: } @@ -117,10 +119,10 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-s.provch: m[c] = struct{}{} - pauseDetectTimer.Reset(time.Millisecond * 500) + pauseDetectTimer.Reset(pauseDetectionThreshold) case c := <-s.dynamicCh: m[c] = struct{}{} - pauseDetectTimer.Reset(time.Millisecond * 500) + pauseDetectTimer.Reset(pauseDetectionThreshold) performedReprovide = true case <-pauseDetectTimer.C: break loop @@ -156,6 +158,7 @@ func (s *BatchProvidingSystem) Run() { } } + log.Debugf("starting provide of %d keys", len(keys)) start := time.Now() err := s.rsys.ProvideMany(s.ctx, keys) if err != nil { @@ -165,9 +168,12 @@ func (s *BatchProvidingSystem) Run() { dur := time.Since(start) totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) + recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys))) s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) s.totalProvides += len(keys) + log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) + if performedReprovide { s.lastReprovideBatchSize = len(keys) s.lastReprovideDuration = dur From 8bbdb3b9fa462eb17e83e34d8995f4ac9b93ed50 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 13:54:04 -0400 Subject: [PATCH 09/29] remove unnecessary goroutine --- batched/system.go | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/batched/system.go b/batched/system.go index 068394f..ce8541b 100644 --- a/batched/system.go +++ b/batched/system.go @@ -30,7 +30,7 @@ type BatchProvidingSystem struct { q *queue.Queue ds datastore.Batching - provch, dynamicCh chan cid.Cid + dynamicCh chan cid.Cid totalProvides, lastReprovideBatchSize int avgProvideDuration, lastReprovideDuration time.Duration @@ -56,7 +56,6 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS keyProvider: nil, q: q, ds: datastore.NewMapDatastore(), - provch: make(chan cid.Cid, 1), dynamicCh: make(chan cid.Cid, 1), } @@ -99,6 +98,8 @@ func KeyProvider(fn simple.KeyChanFunc) Option { func (s *BatchProvidingSystem) Run() { const pauseDetectionThreshold = time.Millisecond * 500 + provCh := s.q.Dequeue() + go func() { m := make(map[cid.Cid]struct{}) for { @@ -109,7 +110,7 @@ func (s *BatchProvidingSystem) Run() { loop: for { select { - case c := <-s.provch: + case c := <-provCh: m[c] = struct{}{} pauseDetectTimer.Reset(pauseDetectionThreshold) continue @@ -117,7 +118,7 @@ func (s *BatchProvidingSystem) Run() { } select { - case c := <-s.provch: + case c := <-provCh: m[c] = struct{}{} pauseDetectTimer.Reset(pauseDetectionThreshold) case c := <-s.dynamicCh: @@ -188,18 +189,6 @@ func (s *BatchProvidingSystem) Run() { } }() - go func() { - ch := s.q.Dequeue() - for { - select { - case c := <-ch: - s.provch <- c - case <-s.ctx.Done(): - return - } - } - }() - go func() { var initialReprovideCh, reprovideCh <-chan time.Time From 9e61fa137d0e3e0a1bcd4478494640e49c05c9cd Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 13:55:38 -0400 Subject: [PATCH 10/29] rename getTime to parseTime --- batched/system.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batched/system.go b/batched/system.go index ce8541b..7478e6b 100644 --- a/batched/system.go +++ b/batched/system.go @@ -233,7 +233,7 @@ func storeTime(t time.Time) []byte { return val } -func getTime(b []byte) (time.Time, error) { +func parseTime(b []byte) (time.Time, error) { tns, err := strconv.ParseInt(string(b), 10, 64) if err != nil { return time.Time{}, err @@ -294,7 +294,7 @@ func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { return time.Time{}, fmt.Errorf("could not get last reprovide time") } - t, err := getTime(val) + t, err := parseTime(val) if err != nil { return time.Time{}, fmt.Errorf("could not decode last reprovide time, got %q", string(val)) } From c3fd90baf982cc3c7de44717f9f0435d4db562d4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 14:04:36 -0400 Subject: [PATCH 11/29] removed newly added syncs from the provider queue and updated its documentation --- queue/queue.go | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 21dcdd3..e81e341 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -14,11 +14,11 @@ import ( var log = logging.Logger("provider.queue") -// Queue provides a durable, FIFO interface to the datastore for storing cids +// Queue provides a best-effort durability, FIFO interface to the datastore for storing cids // -// Durability just means that cids in the process of being provided when a -// crash or shutdown occurs will still be in the queue when the node is -// brought back online. +// Best-effort durability just means that cids in the process of being provided when a +// crash or shutdown occurs may be in the queue when the node is brought back online +// depending on whether the underlying datastore has synchronous or asynchronous writes. type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider @@ -102,12 +102,6 @@ func (q *Queue) work() { log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) return } - - if err := q.ds.Sync(k); err != nil { - log.Errorf("error syncing deletion of queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) - continue - } - continue } } else { @@ -130,12 +124,6 @@ func (q *Queue) work() { log.Errorf("Failed to enqueue cid: %s", err) continue } - - if err := q.ds.Sync(k); err != nil { - log.Errorf("Failed to sync enqueuing cid: %s", err) - continue - } - case dequeue <- c: err := q.ds.Delete(k) @@ -143,12 +131,6 @@ func (q *Queue) work() { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } - - if err := q.ds.Sync(k); err != nil { - log.Errorf("Failed to sync deleted queued cid %s with key %s: %s", c, k, err) - continue - } - c = cid.Undef case <-q.ctx.Done(): return From c52c0b00e5c82a65d5c6a7877a3204566719b0fa Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 14:13:02 -0400 Subject: [PATCH 12/29] batched: wait for goroutines to close before returning from Close --- batched/system.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/batched/system.go b/batched/system.go index 7478e6b..5bcdbaa 100644 --- a/batched/system.go +++ b/batched/system.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "sync" "time" "github.com/ipfs/go-cid" @@ -20,8 +21,9 @@ import ( var log = logging.Logger("provider.batched") type BatchProvidingSystem struct { - ctx context.Context - close context.CancelFunc + ctx context.Context + close context.CancelFunc + closewg sync.WaitGroup reprovideInterval time.Duration rsys provideMany @@ -101,6 +103,9 @@ func (s *BatchProvidingSystem) Run() { provCh := s.q.Dequeue() go func() { + s.closewg.Add(1) + defer s.closewg.Done() + m := make(map[cid.Cid]struct{}) for { pauseDetectTimer := time.NewTimer(time.Hour) @@ -190,6 +195,9 @@ func (s *BatchProvidingSystem) Run() { }() go func() { + s.closewg.Add(1) + defer s.closewg.Done() + var initialReprovideCh, reprovideCh <-chan time.Time // If reproviding is enabled (non-zero) @@ -243,7 +251,9 @@ func parseTime(b []byte) (time.Time, error) { func (s *BatchProvidingSystem) Close() error { s.close() - return s.q.Close() + err := s.q.Close() + s.closewg.Wait() + return err } func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { From 34b4e405aeb3f0b254bead820aa2a4a2234cd093 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 14:26:18 -0400 Subject: [PATCH 13/29] batched: reuse timers in the Run function --- batched/system.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/batched/system.go b/batched/system.go index 5bcdbaa..08d2085 100644 --- a/batched/system.go +++ b/batched/system.go @@ -99,6 +99,8 @@ func KeyProvider(fn simple.KeyChanFunc) Option { func (s *BatchProvidingSystem) Run() { const pauseDetectionThreshold = time.Millisecond * 500 + const maxCollectionDuration = time.Minute * 10 + const maxTimeWaitingForProvides = time.Hour provCh := s.q.Dequeue() @@ -107,9 +109,23 @@ func (s *BatchProvidingSystem) Run() { defer s.closewg.Done() m := make(map[cid.Cid]struct{}) + + maxDurationCollectionTimer := time.NewTimer(maxCollectionDuration) + pauseDetectTimer := time.NewTimer(maxTimeWaitingForProvides) + defer maxDurationCollectionTimer.Stop() + defer pauseDetectTimer.Stop() + for { - pauseDetectTimer := time.NewTimer(time.Hour) - maxDurationCollectionTimer := time.NewTimer(time.Minute * 10) + // Reset timers + if !pauseDetectTimer.Stop() { + <-pauseDetectTimer.C + } + pauseDetectTimer.Reset(maxTimeWaitingForProvides) + + if !maxDurationCollectionTimer.Stop() { + <-maxDurationCollectionTimer.C + } + maxDurationCollectionTimer.Reset(maxCollectionDuration) performedReprovide := false loop: From 4407f46f30765343b77de9541a5a6f2ba20e739e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 14:43:08 -0400 Subject: [PATCH 14/29] renamed variable --- batched/system.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/batched/system.go b/batched/system.go index 08d2085..751cfe2 100644 --- a/batched/system.go +++ b/batched/system.go @@ -110,9 +110,9 @@ func (s *BatchProvidingSystem) Run() { m := make(map[cid.Cid]struct{}) - maxDurationCollectionTimer := time.NewTimer(maxCollectionDuration) + maxCollectionDurationTimer := time.NewTimer(maxCollectionDuration) pauseDetectTimer := time.NewTimer(maxTimeWaitingForProvides) - defer maxDurationCollectionTimer.Stop() + defer maxCollectionDurationTimer.Stop() defer pauseDetectTimer.Stop() for { @@ -122,10 +122,10 @@ func (s *BatchProvidingSystem) Run() { } pauseDetectTimer.Reset(maxTimeWaitingForProvides) - if !maxDurationCollectionTimer.Stop() { - <-maxDurationCollectionTimer.C + if !maxCollectionDurationTimer.Stop() { + <-maxCollectionDurationTimer.C } - maxDurationCollectionTimer.Reset(maxCollectionDuration) + maxCollectionDurationTimer.Reset(maxCollectionDuration) performedReprovide := false loop: @@ -148,7 +148,7 @@ func (s *BatchProvidingSystem) Run() { performedReprovide = true case <-pauseDetectTimer.C: break loop - case <-maxDurationCollectionTimer.C: + case <-maxCollectionDurationTimer.C: break loop case <-s.ctx.Done(): return From f498246e5b922c2c8a04e6c0d282b12293daa917 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 15:54:07 -0400 Subject: [PATCH 15/29] some go timer fixes --- batched/system.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/batched/system.go b/batched/system.go index 751cfe2..0968fb3 100644 --- a/batched/system.go +++ b/batched/system.go @@ -116,15 +116,7 @@ func (s *BatchProvidingSystem) Run() { defer pauseDetectTimer.Stop() for { - // Reset timers - if !pauseDetectTimer.Stop() { - <-pauseDetectTimer.C - } pauseDetectTimer.Reset(maxTimeWaitingForProvides) - - if !maxCollectionDurationTimer.Stop() { - <-maxCollectionDurationTimer.C - } maxCollectionDurationTimer.Reset(maxCollectionDuration) performedReprovide := false @@ -133,7 +125,7 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: m[c] = struct{}{} - pauseDetectTimer.Reset(pauseDetectionThreshold) + resetTimer(pauseDetectTimer, pauseDetectionThreshold) continue default: } @@ -141,14 +133,16 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: m[c] = struct{}{} - pauseDetectTimer.Reset(pauseDetectionThreshold) + resetTimer(pauseDetectTimer, pauseDetectionThreshold) case c := <-s.dynamicCh: m[c] = struct{}{} - pauseDetectTimer.Reset(pauseDetectionThreshold) + resetTimer(pauseDetectTimer, pauseDetectionThreshold) performedReprovide = true case <-pauseDetectTimer.C: + emptyTimer(maxCollectionDurationTimer) break loop case <-maxCollectionDurationTimer.C: + emptyTimer(pauseDetectTimer) break loop case <-s.ctx.Done(): return @@ -252,6 +246,19 @@ func (s *BatchProvidingSystem) Run() { }() } +func emptyTimer(t *time.Timer) { + if !t.Stop() { + <-t.C + } +} + +func resetTimer(t *time.Timer, dur time.Duration) { + if !t.Stop() { + <-t.C + } + t.Reset(dur) +} + func storeTime(t time.Time) []byte { val := []byte(fmt.Sprintf("%d", t.UnixNano())) return val From cffff34971c055228a5d4e699410da05c3396d1a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 15:56:07 -0400 Subject: [PATCH 16/29] have a max collection threshold even when only processing regular provides --- batched/system.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/batched/system.go b/batched/system.go index 0968fb3..62df100 100644 --- a/batched/system.go +++ b/batched/system.go @@ -122,6 +122,13 @@ func (s *BatchProvidingSystem) Run() { performedReprovide := false loop: for { + select { + case <-maxCollectionDurationTimer.C: + emptyTimer(pauseDetectTimer) + break loop + default: + } + select { case c := <-provCh: m[c] = struct{}{} From 6d08b5dd337fa85c0cb458bf28c8e2a4022962a2 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 16:34:54 -0400 Subject: [PATCH 17/29] max collection timer only starts once we have our first provide --- batched/system.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/batched/system.go b/batched/system.go index 62df100..7b088f0 100644 --- a/batched/system.go +++ b/batched/system.go @@ -100,7 +100,6 @@ func KeyProvider(fn simple.KeyChanFunc) Option { func (s *BatchProvidingSystem) Run() { const pauseDetectionThreshold = time.Millisecond * 500 const maxCollectionDuration = time.Minute * 10 - const maxTimeWaitingForProvides = time.Hour provCh := s.q.Dequeue() @@ -110,15 +109,14 @@ func (s *BatchProvidingSystem) Run() { m := make(map[cid.Cid]struct{}) - maxCollectionDurationTimer := time.NewTimer(maxCollectionDuration) - pauseDetectTimer := time.NewTimer(maxTimeWaitingForProvides) + maxCollectionDurationTimer := time.NewTimer(time.Hour) + pauseDetectTimer := time.NewTimer(time.Hour) + emptyTimer(maxCollectionDurationTimer) + emptyTimer(pauseDetectTimer) defer maxCollectionDurationTimer.Stop() defer pauseDetectTimer.Stop() for { - pauseDetectTimer.Reset(maxTimeWaitingForProvides) - maxCollectionDurationTimer.Reset(maxCollectionDuration) - performedReprovide := false loop: for { @@ -133,6 +131,9 @@ func (s *BatchProvidingSystem) Run() { case c := <-provCh: m[c] = struct{}{} resetTimer(pauseDetectTimer, pauseDetectionThreshold) + if len(m) == 1 { + resetTimer(maxCollectionDurationTimer, maxCollectionDuration) + } continue default: } @@ -141,9 +142,15 @@ func (s *BatchProvidingSystem) Run() { case c := <-provCh: m[c] = struct{}{} resetTimer(pauseDetectTimer, pauseDetectionThreshold) + if len(m) == 1 { + resetTimer(maxCollectionDurationTimer, maxCollectionDuration) + } case c := <-s.dynamicCh: m[c] = struct{}{} resetTimer(pauseDetectTimer, pauseDetectionThreshold) + if len(m) == 1 { + resetTimer(maxCollectionDurationTimer, maxCollectionDuration) + } performedReprovide = true case <-pauseDetectTimer.C: emptyTimer(maxCollectionDurationTimer) From 3a94d3a43345ea8e94ad4c95f14714082aa78a0e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 16:35:41 -0400 Subject: [PATCH 18/29] remove cid from provide queue if its invalid --- batched/system.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batched/system.go b/batched/system.go index 7b088f0..604ace3 100644 --- a/batched/system.go +++ b/batched/system.go @@ -169,6 +169,8 @@ func (s *BatchProvidingSystem) Run() { keys := make([]multihash.Multihash, 0, len(m)) for c := range m { + delete(m, c) + // hash security if err := verifcid.ValidateCid(c); err != nil { log.Errorf("insecure hash in reprovider, %s (%s)", c, err) @@ -176,7 +178,6 @@ func (s *BatchProvidingSystem) Run() { } keys = append(keys, c.Hash()) - delete(m, c) } for !s.rsys.Ready() { From 866d9797c9d7df203e3d262406e5c584c7d67d50 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 17:02:32 -0400 Subject: [PATCH 19/29] switch reproviding channel to be unbuffered --- batched/system.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batched/system.go b/batched/system.go index 604ace3..9e6ea70 100644 --- a/batched/system.go +++ b/batched/system.go @@ -58,7 +58,7 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS keyProvider: nil, q: q, ds: datastore.NewMapDatastore(), - dynamicCh: make(chan cid.Cid, 1), + dynamicCh: make(chan cid.Cid), } for _, o := range opts { From f84c7d98c0cbaee5af38088dd08cdfdd78b310a6 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 17:06:21 -0400 Subject: [PATCH 20/29] batched: do not panic if no key provider is set, instead default to returning a closed channel --- batched/system.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/batched/system.go b/batched/system.go index 9e6ea70..02dc9c9 100644 --- a/batched/system.go +++ b/batched/system.go @@ -67,6 +67,14 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS } } + if s.keyProvider == nil { + s.keyProvider = func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + close(ch) + return ch, nil + } + } + // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options ctx, cancel := context.WithCancel(context.Background()) From 8c0765896872534b34ae50790ea50e909906846c Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 17:08:21 -0400 Subject: [PATCH 21/29] batched: renamed dynamicCh and dynamicCidLoop to reprovideCh and reprovideCidLoop since there is no static + dynamic reprovide channels anymore --- batched/system.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/batched/system.go b/batched/system.go index 02dc9c9..016a3c3 100644 --- a/batched/system.go +++ b/batched/system.go @@ -32,7 +32,7 @@ type BatchProvidingSystem struct { q *queue.Queue ds datastore.Batching - dynamicCh chan cid.Cid + reprovideCh chan cid.Cid totalProvides, lastReprovideBatchSize int avgProvideDuration, lastReprovideDuration time.Duration @@ -58,7 +58,7 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS keyProvider: nil, q: q, ds: datastore.NewMapDatastore(), - dynamicCh: make(chan cid.Cid), + reprovideCh: make(chan cid.Cid), } for _, o := range opts { @@ -153,7 +153,7 @@ func (s *BatchProvidingSystem) Run() { if len(m) == 1 { resetTimer(maxCollectionDurationTimer, maxCollectionDuration) } - case c := <-s.dynamicCh: + case c := <-s.reprovideCh: m[c] = struct{}{} resetTimer(pauseDetectTimer, pauseDetectionThreshold) if len(m) == 1 { @@ -320,16 +320,16 @@ func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error return err } -dynamicCidLoop: +reprovideCidLoop: for { select { case c, ok := <-kch: if !ok { - break dynamicCidLoop + break reprovideCidLoop } select { - case s.dynamicCh <- c: + case s.reprovideCh <- c: case <-ctx.Done(): return ctx.Err() } From 71c656434d58209505001d779fdf372c62bbd73a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 19:23:03 -0400 Subject: [PATCH 22/29] batched: add test --- batched/system_test.go | 104 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 batched/system_test.go diff --git a/batched/system_test.go b/batched/system_test.go new file mode 100644 index 0000000..e2cdfc3 --- /dev/null +++ b/batched/system_test.go @@ -0,0 +1,104 @@ +package batched + +import ( + "context" + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "strconv" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + q "github.com/ipfs/go-ipfs-provider/queue" +) + +type mockProvideMany struct { + keys []mh.Multihash +} + +func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.keys = keys + return nil +} + +func (m *mockProvideMany) Ready() bool { + return true +} + +var _ provideMany = (*mockProvideMany)(nil) + +func TestBatched(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + provider := &mockProvideMany{} + + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + const numProvides = 100 + keysToProvide := make(map[cid.Cid]int) + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, h) + keysToProvide[c] = i + } + + batchSystem, err := New(provider, queue, KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + for k := range keysToProvide { + select { + case ch <- k: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + })) + if err != nil { + t.Fatal(err) + } + + batchSystem.Run() + + for { + if ctx.Err() != nil { + t.Fatal("test hung") + } + if len(provider.keys) != 0 { + break + } + time.Sleep(time.Millisecond * 100) + } + + if len(provider.keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(provider.keys)) + } + + provMap := make(map[string]struct{}) + for _, k := range provider.keys { + provMap[string(k)] = struct{}{} + } + + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + if _, found := provMap[string(h)]; !found { + t.Fatalf("could not find provider with value %d", i) + } + } +} From b017f2687786f8c9621efcaa09d57d2a082cca79 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 19:25:18 -0400 Subject: [PATCH 23/29] fix max collection check --- batched/system.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/batched/system.go b/batched/system.go index 016a3c3..8de9940 100644 --- a/batched/system.go +++ b/batched/system.go @@ -137,28 +137,28 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: - m[c] = struct{}{} - resetTimer(pauseDetectTimer, pauseDetectionThreshold) - if len(m) == 1 { + if len(m) == 0 { resetTimer(maxCollectionDurationTimer, maxCollectionDuration) } + m[c] = struct{}{} + resetTimer(pauseDetectTimer, pauseDetectionThreshold) continue default: } select { case c := <-provCh: - m[c] = struct{}{} - resetTimer(pauseDetectTimer, pauseDetectionThreshold) - if len(m) == 1 { + if len(m) == 0 { resetTimer(maxCollectionDurationTimer, maxCollectionDuration) } - case c := <-s.reprovideCh: m[c] = struct{}{} resetTimer(pauseDetectTimer, pauseDetectionThreshold) - if len(m) == 1 { + case c := <-s.reprovideCh: + if len(m) == 0 { resetTimer(maxCollectionDurationTimer, maxCollectionDuration) } + m[c] = struct{}{} + resetTimer(pauseDetectTimer, pauseDetectionThreshold) performedReprovide = true case <-pauseDetectTimer.C: emptyTimer(maxCollectionDurationTimer) From d366b610f59cce73653175e27af9575aa1cdfc05 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 19:26:26 -0400 Subject: [PATCH 24/29] fixed waitgroup usage bug --- batched/system.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batched/system.go b/batched/system.go index 8de9940..e797279 100644 --- a/batched/system.go +++ b/batched/system.go @@ -111,8 +111,8 @@ func (s *BatchProvidingSystem) Run() { provCh := s.q.Dequeue() + s.closewg.Add(1) go func() { - s.closewg.Add(1) defer s.closewg.Done() m := make(map[cid.Cid]struct{}) @@ -227,8 +227,8 @@ func (s *BatchProvidingSystem) Run() { } }() + s.closewg.Add(1) go func() { - s.closewg.Add(1) defer s.closewg.Done() var initialReprovideCh, reprovideCh <-chan time.Time From 843e75c9fa7aa9c875bf4d7864df2f406cffdfb1 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 21:16:24 -0400 Subject: [PATCH 25/29] batched: internally configurable initial reprovide --- batched/system.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/batched/system.go b/batched/system.go index e797279..afbeffc 100644 --- a/batched/system.go +++ b/batched/system.go @@ -25,9 +25,12 @@ type BatchProvidingSystem struct { close context.CancelFunc closewg sync.WaitGroup - reprovideInterval time.Duration - rsys provideMany - keyProvider simple.KeyChanFunc + reprovideInterval time.Duration + initalReprovideDelay time.Duration + initialReprovideDelaySet bool + + rsys provideMany + keyProvider simple.KeyChanFunc q *queue.Queue ds datastore.Batching @@ -67,6 +70,17 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS } } + // Setup default behavior for the initial reprovide delay + // + // If the reprovide ticker is larger than a minute (likely), + // provide once after we've been up a minute. + // + // Don't provide _immediately_ as we might be just about to stop. + if !s.initialReprovideDelaySet && s.reprovideInterval > time.Minute { + s.initalReprovideDelay = time.Minute + s.initialReprovideDelaySet = true + } + if s.keyProvider == nil { s.keyProvider = func(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid) @@ -105,6 +119,14 @@ func KeyProvider(fn simple.KeyChanFunc) Option { } } +func initialReprovideDelay(duration time.Duration) Option { + return func(system *BatchProvidingSystem) error { + system.initialReprovideDelaySet = true + system.initalReprovideDelay = duration + return nil + } +} + func (s *BatchProvidingSystem) Run() { const pauseDetectionThreshold = time.Millisecond * 500 const maxCollectionDuration = time.Minute * 10 @@ -239,12 +261,9 @@ func (s *BatchProvidingSystem) Run() { defer reprovideTicker.Stop() reprovideCh = reprovideTicker.C - // If the reprovide ticker is larger than a minute (likely), - // provide once after we've been up a minute. - // - // Don't provide _immediately_ as we might be just about to stop. - if s.reprovideInterval > time.Minute { - initialReprovideTimer := time.NewTimer(time.Minute) + // if there is a non-zero initial reprovide time that was set in the initializer or if the fallback has been + if s.initialReprovideDelaySet { + initialReprovideTimer := time.NewTimer(s.initalReprovideDelay) defer initialReprovideTimer.Stop() initialReprovideCh = initialReprovideTimer.C From 04452c4bf5dcb6fc9a30a563cba30e66ff5fdd28 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 21:09:47 -0400 Subject: [PATCH 26/29] batched: add basic test --- batched/system_test.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/batched/system_test.go b/batched/system_test.go index e2cdfc3..b2b3120 100644 --- a/batched/system_test.go +++ b/batched/system_test.go @@ -2,22 +2,27 @@ package batched import ( "context" - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" "strconv" + "sync" "testing" "time" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + q "github.com/ipfs/go-ipfs-provider/queue" ) type mockProvideMany struct { + lk sync.Mutex keys []mh.Multihash } func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.lk.Lock() + defer m.lk.Unlock() m.keys = keys return nil } @@ -26,13 +31,19 @@ func (m *mockProvideMany) Ready() bool { return true } +func (m *mockProvideMany) GetKeys() []mh.Multihash { + m.lk.Lock() + defer m.lk.Unlock() + return m.keys[:] +} + var _ provideMany = (*mockProvideMany)(nil) func TestBatched(t *testing.T) { ctx := context.Background() defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) queue, err := q.NewQueue(ctx, "test", ds) if err != nil { t.Fatal(err) @@ -66,29 +77,31 @@ func TestBatched(t *testing.T) { } }() return ch, nil - })) + }), initialReprovideDelay(0)) if err != nil { t.Fatal(err) } batchSystem.Run() + var keys []mh.Multihash for { if ctx.Err() != nil { t.Fatal("test hung") } - if len(provider.keys) != 0 { + keys = provider.GetKeys() + if len(keys) != 0 { break } time.Sleep(time.Millisecond * 100) } - if len(provider.keys) != numProvides { - t.Fatalf("expected %d provider keys, got %d", numProvides, len(provider.keys)) + if len(keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) } provMap := make(map[string]struct{}) - for _, k := range provider.keys { + for _, k := range keys { provMap[string(k)] = struct{}{} } From edb6021269dc6c4a0a8f6dbce6e33eed27b388c6 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 21:11:25 -0400 Subject: [PATCH 27/29] batched: rework timer usage --- batched/system.go | 57 ++++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/batched/system.go b/batched/system.go index afbeffc..5d82cf3 100644 --- a/batched/system.go +++ b/batched/system.go @@ -128,7 +128,9 @@ func initialReprovideDelay(duration time.Duration) Option { } func (s *BatchProvidingSystem) Run() { + // how long we wait between the first provider we hear about and batching up the provides to send out const pauseDetectionThreshold = time.Millisecond * 500 + // how long we are willing to collect providers for the batch after we receive the first one const maxCollectionDuration = time.Minute * 10 provCh := s.q.Dequeue() @@ -139,54 +141,66 @@ func (s *BatchProvidingSystem) Run() { m := make(map[cid.Cid]struct{}) + // setup stopped timers maxCollectionDurationTimer := time.NewTimer(time.Hour) pauseDetectTimer := time.NewTimer(time.Hour) - emptyTimer(maxCollectionDurationTimer) - emptyTimer(pauseDetectTimer) + stopAndEmptyTimer(maxCollectionDurationTimer) + stopAndEmptyTimer(pauseDetectTimer) + + // make sure timers are cleaned up defer maxCollectionDurationTimer.Stop() defer pauseDetectTimer.Stop() + resetTimersAfterReceivingProvide := func(firstProvide bool) { + if firstProvide { + // after receiving the first provider start up the timers + maxCollectionDurationTimer.Reset(maxCollectionDuration) + pauseDetectTimer.Reset(pauseDetectionThreshold) + } else { + // otherwise just do a full restart of the pause timer + stopAndEmptyTimer(pauseDetectTimer) + pauseDetectTimer.Reset(pauseDetectionThreshold) + } + } + for { performedReprovide := false + + // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be + // stopped and have empty channels loop: for { select { case <-maxCollectionDurationTimer.C: - emptyTimer(pauseDetectTimer) + // if this timer has fired then the pause timer has started so let's stop and empty it + stopAndEmptyTimer(pauseDetectTimer) break loop default: } select { case c := <-provCh: - if len(m) == 0 { - resetTimer(maxCollectionDurationTimer, maxCollectionDuration) - } + resetTimersAfterReceivingProvide(len(m) == 0) m[c] = struct{}{} - resetTimer(pauseDetectTimer, pauseDetectionThreshold) continue default: } select { case c := <-provCh: - if len(m) == 0 { - resetTimer(maxCollectionDurationTimer, maxCollectionDuration) - } + resetTimersAfterReceivingProvide(len(m) == 0) m[c] = struct{}{} - resetTimer(pauseDetectTimer, pauseDetectionThreshold) case c := <-s.reprovideCh: - if len(m) == 0 { - resetTimer(maxCollectionDurationTimer, maxCollectionDuration) - } + resetTimersAfterReceivingProvide(len(m) == 0) m[c] = struct{}{} - resetTimer(pauseDetectTimer, pauseDetectionThreshold) performedReprovide = true case <-pauseDetectTimer.C: - emptyTimer(maxCollectionDurationTimer) + // if this timer has fired then the max collection timer has started so let's stop and empty it + stopAndEmptyTimer(maxCollectionDurationTimer) break loop case <-maxCollectionDurationTimer.C: - emptyTimer(pauseDetectTimer) + // if this timer has fired then the pause timer has started so let's stop and empty it + stopAndEmptyTimer(pauseDetectTimer) break loop case <-s.ctx.Done(): return @@ -288,17 +302,10 @@ func (s *BatchProvidingSystem) Run() { }() } -func emptyTimer(t *time.Timer) { - if !t.Stop() { - <-t.C - } -} - -func resetTimer(t *time.Timer, dur time.Duration) { +func stopAndEmptyTimer(t *time.Timer) { if !t.Stop() { <-t.C } - t.Reset(dur) } func storeTime(t time.Time) []byte { From 1c19caa050243b954edd32c381239cd60bb9434d Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 21:46:26 -0400 Subject: [PATCH 28/29] use closure --- batched/system.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/batched/system.go b/batched/system.go index 5d82cf3..46a696a 100644 --- a/batched/system.go +++ b/batched/system.go @@ -151,7 +151,8 @@ func (s *BatchProvidingSystem) Run() { defer maxCollectionDurationTimer.Stop() defer pauseDetectTimer.Stop() - resetTimersAfterReceivingProvide := func(firstProvide bool) { + resetTimersAfterReceivingProvide := func() { + firstProvide := len(m) == 0 if firstProvide { // after receiving the first provider start up the timers maxCollectionDurationTimer.Reset(maxCollectionDuration) @@ -180,7 +181,7 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: - resetTimersAfterReceivingProvide(len(m) == 0) + resetTimersAfterReceivingProvide() m[c] = struct{}{} continue default: @@ -188,10 +189,10 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: - resetTimersAfterReceivingProvide(len(m) == 0) + resetTimersAfterReceivingProvide() m[c] = struct{}{} case c := <-s.reprovideCh: - resetTimersAfterReceivingProvide(len(m) == 0) + resetTimersAfterReceivingProvide() m[c] = struct{}{} performedReprovide = true case <-pauseDetectTimer.C: From c86f3debeefa5eba224078e760ca913b0048f321 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 14 May 2021 03:56:52 -0400 Subject: [PATCH 29/29] fix debug log usage --- batched/system.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batched/system.go b/batched/system.go index 46a696a..5637e55 100644 --- a/batched/system.go +++ b/batched/system.go @@ -388,7 +388,7 @@ func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { func (s *BatchProvidingSystem) shouldReprovide() bool { t, err := s.getLastReprovideTime() if err != nil { - log.Debugf(err.Error()) + log.Debugf("getting last reprovide time failed: %s", err) return false }