From 0be0cbc50e462d576a62436fdf9b6cf677f1c6ce Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 10 Dec 2019 15:33:13 +0100 Subject: [PATCH 1/2] feat: refresh and wait We'd like to be able to refresh then _wait_ for the refresh to finish in the testground DHT tests. That way, we can: 1. Start and disable _auto_ refresh. 2. Bootstrap. 3. Refresh a couple of times till we're stable. 4. Wait to _stop_ refreshing. 5. Disconnect from and forget about all peers _not_ in our routing tables. 6. Run the actual tests without interference from the bootstrapping logic. --- dht.go | 4 +-- dht_bootstrap.go | 77 ++++++++++++++++++++++++++++++++++++++++-------- dht_test.go | 17 ++++++----- go.mod | 1 + go.sum | 4 +++ notif.go | 4 +-- 6 files changed, 83 insertions(+), 24 deletions(-) diff --git a/dht.go b/dht.go index 7494bd453..c52bcb476 100644 --- a/dht.go +++ b/dht.go @@ -70,7 +70,7 @@ type IpfsDHT struct { autoRefresh bool rtRefreshQueryTimeout time.Duration rtRefreshPeriod time.Duration - triggerRtRefresh chan struct{} + triggerRtRefresh chan chan<- error maxRecordAge time.Duration } @@ -167,7 +167,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p routingTable: rt, protocols: protocols, bucketSize: bucketSize, - triggerRtRefresh: make(chan struct{}), + triggerRtRefresh: make(chan chan<- error), } dht.ctx = dht.newContextWithLocalTags(ctx) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index c17407c16..b6f82bdfb 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -2,8 +2,10 @@ package dht import ( "context" + "fmt" "time" + multierror "github.com/hashicorp/go-multierror" process "github.com/jbenet/goprocess" processctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p-core/routing" @@ -59,27 +61,54 @@ func (dht *IpfsDHT) startRefreshing() error { } for { + var waiting []chan<- error select { case <-refreshTicker.C: - case <-dht.triggerRtRefresh: - logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size()) + case res := <-dht.triggerRtRefresh: + if res != nil { + waiting = append(waiting, res) + } case <-ctx.Done(): return } - dht.doRefresh(ctx) + + collectWaiting: + for { + select { + case res := <-dht.triggerRtRefresh: + if res != nil { + waiting = append(waiting, res) + } + default: + break collectWaiting + } + } + err := dht.doRefresh(ctx) + for _, w := range waiting { + w <- err + } + if err != nil { + logger.Warning(err) + } } }) return nil } -func (dht *IpfsDHT) doRefresh(ctx context.Context) { - dht.selfWalk(ctx) - dht.refreshBuckets(ctx) +func (dht *IpfsDHT) doRefresh(ctx context.Context) error { + var merr error + if err := dht.selfWalk(ctx); err != nil { + merr = multierror.Append(merr, err) + } + if err := dht.refreshBuckets(ctx); err != nil { + merr = multierror.Append(merr, err) + } + return merr } // refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period -func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { +func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error { doQuery := func(bucketId int, target string, f func(context.Context) error) error { logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)", bucketId, target, dht.routingTable.Size()) @@ -103,6 +132,8 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { // 16 bits specified anyways. buckets = buckets[:16] } + + var merr error for bucketID, bucket := range buckets { if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod { continue @@ -120,20 +151,24 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { } if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { - logger.Warningf("failed to do a random walk on bucket %d: %s", bucketID, err) + merr = multierror.Append( + merr, + fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err), + ) } } + return merr } // Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) { +func (dht *IpfsDHT) selfWalk(ctx context.Context) error { queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) defer cancel() _, err := dht.FindPeer(queryCtx, dht.self) if err == routing.ErrNotFound { - return + return nil } - logger.Warningf("failed to query self during routing table refresh: %s", err) + return fmt.Errorf("failed to query self during routing table refresh: %s", err) } // Bootstrap tells the DHT to get into a bootstrapped state satisfying the @@ -148,7 +183,25 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error { // RefreshRoutingTable tells the DHT to refresh it's routing tables. func (dht *IpfsDHT) RefreshRoutingTable() { select { - case dht.triggerRtRefresh <- struct{}{}: + case dht.triggerRtRefresh <- nil: default: } } + +// RefreshRoutingTableWait tells the DHT to refresh it's routing tables and +// waits for it to finish. +func (dht *IpfsDHT) RefreshRoutingTableWait(ctx context.Context) error { + res := make(chan error, 1) + select { + case dht.triggerRtRefresh <- res: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case err := <-res: + return err + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/dht_test.go b/dht_test.go index b15e5b0cf..806f9d89c 100644 --- a/dht_test.go +++ b/dht_test.go @@ -201,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.RefreshRoutingTable() + dht.RefreshRoutingTableWait(ctx) } } @@ -663,25 +663,26 @@ func TestRefresh(t *testing.T) { <-time.After(100 * time.Millisecond) // bootstrap a few times until we get good tables. - stop := make(chan struct{}) + t.Logf("bootstrapping them so they find each other %d", nDHTs) + ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second) + defer cancelT() + go func() { - for { - t.Logf("bootstrapping them so they find each other %d", nDHTs) - ctxT, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + for ctxT.Err() == nil { bootstrap(t, ctxT, dhts) + // wait a bit. select { case <-time.After(50 * time.Millisecond): continue // being explicit - case <-stop: + case <-ctxT.Done(): return } } }() waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second) - close(stop) + cancelT() if u.Debug { // the routing tables should be full now. let's inspect them. diff --git a/go.mod b/go.mod index d5ad5585e..a49dc83bd 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.12 require ( github.com/gogo/protobuf v1.3.1 + github.com/hashicorp/go-multierror v1.0.0 github.com/hashicorp/golang-lru v0.5.3 github.com/ipfs/go-cid v0.0.3 github.com/ipfs/go-datastore v0.3.1 diff --git a/go.sum b/go.sum index 13a314667..ce55cc047 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,10 @@ github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyF github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= diff --git a/notif.go b/notif.go index 87ba1a300..bafcbcfe4 100644 --- a/notif.go +++ b/notif.go @@ -36,7 +36,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.Update(dht.Context(), p) if refresh && dht.autoRefresh { select { - case dht.triggerRtRefresh <- struct{}{}: + case dht.triggerRtRefresh <- nil: default: } } @@ -82,7 +82,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.Update(dht.Context(), p) if refresh && dht.autoRefresh { select { - case dht.triggerRtRefresh <- struct{}{}: + case dht.triggerRtRefresh <- nil: default: } } From 50a9858ef6c14a87bc0d54d116bb9117b86b5411 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 10 Dec 2019 17:25:12 +0100 Subject: [PATCH 2/2] feat(dht): switch to a single RefreshRoutingTable function returning a channel --- dht_bootstrap.go | 28 +++++++++------------------- dht_test.go | 9 ++++++++- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index b6f82bdfb..7078765e0 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -72,6 +72,7 @@ func (dht *IpfsDHT) startRefreshing() error { return } + // Batch multiple refresh requests if they're all waiting at the same time. collectWaiting: for { select { @@ -83,9 +84,11 @@ func (dht *IpfsDHT) startRefreshing() error { break collectWaiting } } + err := dht.doRefresh(ctx) for _, w := range waiting { w <- err + close(w) } if err != nil { logger.Warning(err) @@ -181,27 +184,14 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error { } // RefreshRoutingTable tells the DHT to refresh it's routing tables. -func (dht *IpfsDHT) RefreshRoutingTable() { - select { - case dht.triggerRtRefresh <- nil: - default: - } -} - -// RefreshRoutingTableWait tells the DHT to refresh it's routing tables and -// waits for it to finish. -func (dht *IpfsDHT) RefreshRoutingTableWait(ctx context.Context) error { +// +// The returned channel will block until the refresh finishes, then yield the +// error and close. The channel is buffered and safe to ignore. +func (dht *IpfsDHT) RefreshRoutingTable() <-chan error { res := make(chan error, 1) select { case dht.triggerRtRefresh <- res: - case <-ctx.Done(): - return ctx.Err() - } - - select { - case err := <-res: - return err - case <-ctx.Done(): - return ctx.Err() + default: } + return res } diff --git a/dht_test.go b/dht_test.go index 806f9d89c..8c0f19e07 100644 --- a/dht_test.go +++ b/dht_test.go @@ -201,7 +201,14 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.RefreshRoutingTableWait(ctx) + select { + case err := <-dht.RefreshRoutingTable(): + if err != nil { + t.Error(err) + } + case <-ctx.Done(): + return + } } }