diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 31364927f..378e5a196 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -45,7 +45,7 @@ func init() { // Start the refresh worker. func (dht *IpfsDHT) startRefreshing() error { - // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period + // scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period dht.proc.Go(func(proc process.Process) { ctx := processctx.OnClosingContext(proc) @@ -104,20 +104,20 @@ func (dht *IpfsDHT) doRefresh(ctx context.Context) error { if err := dht.selfWalk(ctx); err != nil { merr = multierror.Append(merr, err) } - if err := dht.refreshBuckets(ctx); err != nil { + if err := dht.refreshCpls(ctx); err != nil { merr = multierror.Append(merr, err) } return merr } -// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period -func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error { - doQuery := func(bucketId int, target string, f func(context.Context) error) error { - logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)", - bucketId, target, dht.routingTable.Size()) +// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period +func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { + doQuery := func(cpl uint, target string, f func(context.Context) error) error { + logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)", + cpl, target, dht.routingTable.Size()) defer func() { - logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)", - bucketId, target, dht.routingTable.Size()) + logger.Infof("finished refreshing cpl %d to %s (routing table size is now %d)", + cpl, target, dht.routingTable.Size()) }() queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) defer cancel() @@ -128,35 +128,33 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error { return err } - buckets := dht.routingTable.GetAllBuckets() - if len(buckets) > 16 { - // Don't bother bootstrapping more than 16 buckets. - // GenRandPeerID can't generate target peer IDs with more than - // 16 bits specified anyways. - buckets = buckets[:16] - } + trackedCpls := dht.routingTable.GetTrackedCplsForRefresh() var merr error - for bucketID, bucket := range buckets { - if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod { + for _, tcpl := range trackedCpls { + if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod { + continue + } + // gen rand peer with the cpl + randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl) + if err != nil { + logger.Errorf("failed to generate peerID for cpl %d, err: %s", tcpl.Cpl, err) continue } - // gen rand peer in the bucket - randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID) // walk to the generated peer walkFnc := func(c context.Context) error { - _, err := dht.FindPeer(c, randPeerInBucket) + _, err := dht.FindPeer(c, randPeer) if err == routing.ErrNotFound { return nil } return err } - if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { + if err := doQuery(tcpl.Cpl, randPeer.String(), walkFnc); err != nil { merr = multierror.Append( merr, - fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err), + fmt.Errorf("failed to do a random walk for cpl %d: %s", tcpl.Cpl, err), ) } } diff --git a/go.mod b/go.mod index d0612dade..41dbc695d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jbenet/goprocess v0.1.3 github.com/libp2p/go-libp2p v0.4.2 github.com/libp2p/go-libp2p-core v0.2.5 - github.com/libp2p/go-libp2p-kbucket v0.2.2 + github.com/libp2p/go-libp2p-kbucket v0.2.3 github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-routing v0.1.0 diff --git a/go.sum b/go.sum index 6d265663b..747d0ec57 100644 --- a/go.sum +++ b/go.sum @@ -185,6 +185,8 @@ github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6 github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g= github.com/libp2p/go-libp2p-kbucket v0.2.2 h1:Jg/JUbQix6mvTnj+86FasRqkh01JFQNrN+H26Gxxsg0= github.com/libp2p/go-libp2p-kbucket v0.2.2/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA= +github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM= +github.com/libp2p/go-libp2p-kbucket v0.2.3/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= diff --git a/lookup.go b/lookup.go index a00fbfbc5..ee1552360 100644 --- a/lookup.go +++ b/lookup.go @@ -103,8 +103,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee } if res != nil && res.queriedSet != nil { - // refresh the k-bucket containing this key as the query was successful - dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + // refresh the cpl for this key as the query was successful + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now()) sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key)) l := len(sorted) diff --git a/routing.go b/routing.go index a796f3e6c..970d52159 100644 --- a/routing.go +++ b/routing.go @@ -396,9 +396,8 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // // We'll just call this a success. if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) { - // refresh the k-bucket containing this key as the query was successful - dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) - + // refresh the cpl for this key as the query was successful + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now()) err = nil } done(err) @@ -623,8 +622,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, }) } - // refresh the k-bucket containing this key after the query is run - dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) + // refresh the cpl for this key after the query is run + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key.KeyString()), time.Now()) } // FindPeer searches for a peer with given ID. @@ -696,8 +695,8 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peer.AddrInfo{}, err } - // refresh the k-bucket containing this key since the lookup was successful - dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + // refresh the cpl for this key since the lookup was successful + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now()) logger.Debugf("FindPeer %v %v", id, result.success) if result.peer.ID == "" { @@ -767,8 +766,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< logger.Debug(err) } - // refresh the k-bucket containing this key - dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + // refresh the cpl for this key + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now()) // close the peerchan channel when done. close(peerchan)