Skip to content

Commit

Permalink
Merge pull request #428 from aarshkshah1992/feat/refresh-cpls
Browse files Browse the repository at this point in the history
Refresh cpl's in dht
  • Loading branch information
Stebalien authored Dec 17, 2019
2 parents 17ab593 + ef6ffec commit 904b4cd
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 35 deletions.
44 changes: 21 additions & 23 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

Expand Down Expand Up @@ -104,20 +104,20 @@ func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)
}
if err := dht.refreshBuckets(ctx); err != nil {
if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}

// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period
func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
doQuery := func(cpl uint, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)",
cpl, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)",
bucketId, target, dht.routingTable.Size())
logger.Infof("finished refreshing cpl %d to %s (routing table size is now %d)",
cpl, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
Expand All @@ -128,35 +128,33 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
return err
}

buckets := dht.routingTable.GetAllBuckets()
if len(buckets) > 16 {
// Don't bother bootstrapping more than 16 buckets.
// GenRandPeerID can't generate target peer IDs with more than
// 16 bits specified anyways.
buckets = buckets[:16]
}
trackedCpls := dht.routingTable.GetTrackedCplsForRefresh()

var merr error
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
for _, tcpl := range trackedCpls {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}
// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
logger.Errorf("failed to generate peerID for cpl %d, err: %s", tcpl.Cpl, err)
continue
}
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(c, randPeerInBucket)
_, err := dht.FindPeer(c, randPeer)
if err == routing.ErrNotFound {
return nil
}
return err
}

if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
if err := doQuery(tcpl.Cpl, randPeer.String(), walkFnc); err != nil {
merr = multierror.Append(
merr,
fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err),
fmt.Errorf("failed to do a random walk for cpl %d: %s", tcpl.Cpl, err),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.4.2
github.com/libp2p/go-libp2p-core v0.2.5
github.com/libp2p/go-libp2p-kbucket v0.2.2
github.com/libp2p/go-libp2p-kbucket v0.2.3
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing v0.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-kbucket v0.2.2 h1:Jg/JUbQix6mvTnj+86FasRqkh01JFQNrN+H26Gxxsg0=
github.com/libp2p/go-libp2p-kbucket v0.2.2/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA=
github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM=
github.com/libp2p/go-libp2p-kbucket v0.2.3/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand Down
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
}

if res != nil && res.queriedSet != nil {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())

sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
l := len(sorted)
Expand Down
17 changes: 8 additions & 9 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,8 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
//
// We'll just call this a success.
if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())

// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
err = nil
}
done(err)
Expand Down Expand Up @@ -623,8 +622,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
})
}

// refresh the k-bucket containing this key after the query is run
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
// refresh the cpl for this key after the query is run
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key.KeyString()), time.Now())
}

// FindPeer searches for a peer with given ID.
Expand Down Expand Up @@ -696,8 +695,8 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return peer.AddrInfo{}, err
}

// refresh the k-bucket containing this key since the lookup was successful
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key since the lookup was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())

logger.Debugf("FindPeer %v %v", id, result.success)
if result.peer.ID == "" {
Expand Down Expand Up @@ -767,8 +766,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
logger.Debug(err)
}

// refresh the k-bucket containing this key
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())

// close the peerchan channel when done.
close(peerchan)
Expand Down

0 comments on commit 904b4cd

Please sign in to comment.