Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh cpl's in dht #428

Merged
merged 1 commit into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

Expand Down Expand Up @@ -104,20 +104,20 @@ func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)
}
if err := dht.refreshBuckets(ctx); err != nil {
if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}

// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period
func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
doQuery := func(cpl uint, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)",
cpl, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)",
bucketId, target, dht.routingTable.Size())
logger.Infof("finished refreshing cpl %d to %s (routing table size is now %d)",
cpl, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
Expand All @@ -128,35 +128,33 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
return err
}

buckets := dht.routingTable.GetAllBuckets()
if len(buckets) > 16 {
// Don't bother bootstrapping more than 16 buckets.
// GenRandPeerID can't generate target peer IDs with more than
// 16 bits specified anyways.
buckets = buckets[:16]
}
trackedCpls := dht.routingTable.GetTrackedCplsForRefresh()

var merr error
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
for _, tcpl := range trackedCpls {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}
// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
logger.Errorf("failed to generate peerID for cpl %d, err: %s", tcpl.Cpl, err)
continue
}
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(c, randPeerInBucket)
_, err := dht.FindPeer(c, randPeer)
if err == routing.ErrNotFound {
return nil
}
return err
}

if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
if err := doQuery(tcpl.Cpl, randPeer.String(), walkFnc); err != nil {
merr = multierror.Append(
merr,
fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err),
fmt.Errorf("failed to do a random walk for cpl %d: %s", tcpl.Cpl, err),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.4.2
github.com/libp2p/go-libp2p-core v0.2.5
github.com/libp2p/go-libp2p-kbucket v0.2.2
github.com/libp2p/go-libp2p-kbucket v0.2.3
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing v0.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-kbucket v0.2.2 h1:Jg/JUbQix6mvTnj+86FasRqkh01JFQNrN+H26Gxxsg0=
github.com/libp2p/go-libp2p-kbucket v0.2.2/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA=
github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM=
github.com/libp2p/go-libp2p-kbucket v0.2.3/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand Down
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
}

if res != nil && res.queriedSet != nil {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())

sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
l := len(sorted)
Expand Down
17 changes: 8 additions & 9 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,8 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
//
// We'll just call this a success.
if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())

// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
err = nil
}
done(err)
Expand Down Expand Up @@ -623,8 +622,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
})
}

// refresh the k-bucket containing this key after the query is run
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
// refresh the cpl for this key after the query is run
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key.KeyString()), time.Now())
}

// FindPeer searches for a peer with given ID.
Expand Down Expand Up @@ -696,8 +695,8 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return peer.AddrInfo{}, err
}

// refresh the k-bucket containing this key since the lookup was successful
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key since the lookup was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())

logger.Debugf("FindPeer %v %v", id, result.success)
if result.peer.ID == "" {
Expand Down Expand Up @@ -767,8 +766,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
logger.Debug(err)
}

// refresh the k-bucket containing this key
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())

// close the peerchan channel when done.
close(peerchan)
Expand Down