-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: refresh and wait #418
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,7 +70,7 @@ type IpfsDHT struct { | |
autoRefresh bool | ||
rtRefreshQueryTimeout time.Duration | ||
rtRefreshPeriod time.Duration | ||
triggerRtRefresh chan struct{} | ||
triggerRtRefresh chan chan<- error | ||
|
||
maxRecordAge time.Duration | ||
} | ||
|
@@ -167,7 +167,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p | |
routingTable: rt, | ||
protocols: protocols, | ||
bucketSize: bucketSize, | ||
triggerRtRefresh: make(chan struct{}), | ||
triggerRtRefresh: make(chan chan<- error), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is slightly stricter. |
||
} | ||
|
||
dht.ctx = dht.newContextWithLocalTags(ctx) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,10 @@ package dht | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
multierror "github.com/hashicorp/go-multierror" | ||
process "github.com/jbenet/goprocess" | ||
processctx "github.com/jbenet/goprocess/context" | ||
"github.com/libp2p/go-libp2p-core/routing" | ||
|
@@ -59,27 +61,57 @@ func (dht *IpfsDHT) startRefreshing() error { | |
} | ||
|
||
for { | ||
var waiting []chan<- error | ||
select { | ||
case <-refreshTicker.C: | ||
case <-dht.triggerRtRefresh: | ||
logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size()) | ||
case res := <-dht.triggerRtRefresh: | ||
if res != nil { | ||
waiting = append(waiting, res) | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
dht.doRefresh(ctx) | ||
|
||
// Batch multiple refresh requests if they're all waiting at the same time. | ||
collectWaiting: | ||
for { | ||
select { | ||
case res := <-dht.triggerRtRefresh: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this second select here to handle the case where multiple callers might ask for a refresh "simultaneously" ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I'll document that. |
||
if res != nil { | ||
waiting = append(waiting, res) | ||
} | ||
default: | ||
break collectWaiting | ||
} | ||
} | ||
|
||
err := dht.doRefresh(ctx) | ||
for _, w := range waiting { | ||
w <- err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Close the channel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (well, if we're returning a channel, we should probably close it) |
||
close(w) | ||
} | ||
if err != nil { | ||
logger.Warning(err) | ||
} | ||
} | ||
}) | ||
|
||
return nil | ||
} | ||
|
||
func (dht *IpfsDHT) doRefresh(ctx context.Context) { | ||
dht.selfWalk(ctx) | ||
dht.refreshBuckets(ctx) | ||
func (dht *IpfsDHT) doRefresh(ctx context.Context) error { | ||
var merr error | ||
if err := dht.selfWalk(ctx); err != nil { | ||
merr = multierror.Append(merr, err) | ||
} | ||
if err := dht.refreshBuckets(ctx); err != nil { | ||
merr = multierror.Append(merr, err) | ||
} | ||
return merr | ||
} | ||
|
||
// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period | ||
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { | ||
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error { | ||
doQuery := func(bucketId int, target string, f func(context.Context) error) error { | ||
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)", | ||
bucketId, target, dht.routingTable.Size()) | ||
|
@@ -103,6 +135,8 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { | |
// 16 bits specified anyways. | ||
buckets = buckets[:16] | ||
} | ||
|
||
var merr error | ||
for bucketID, bucket := range buckets { | ||
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod { | ||
continue | ||
|
@@ -120,20 +154,24 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { | |
} | ||
|
||
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { | ||
logger.Warningf("failed to do a random walk on bucket %d: %s", bucketID, err) | ||
merr = multierror.Append( | ||
merr, | ||
fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err), | ||
) | ||
} | ||
} | ||
return merr | ||
} | ||
|
||
// Traverse the DHT toward the self ID | ||
func (dht *IpfsDHT) selfWalk(ctx context.Context) { | ||
func (dht *IpfsDHT) selfWalk(ctx context.Context) error { | ||
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) | ||
defer cancel() | ||
_, err := dht.FindPeer(queryCtx, dht.self) | ||
if err == routing.ErrNotFound { | ||
return | ||
return nil | ||
} | ||
logger.Warningf("failed to query self during routing table refresh: %s", err) | ||
return fmt.Errorf("failed to query self during routing table refresh: %s", err) | ||
} | ||
|
||
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the | ||
|
@@ -146,9 +184,14 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error { | |
} | ||
|
||
// RefreshRoutingTable tells the DHT to refresh it's routing tables. | ||
func (dht *IpfsDHT) RefreshRoutingTable() { | ||
// | ||
// The returned channel will block until the refresh finishes, then yield the | ||
// error and close. The channel is buffered and safe to ignore. | ||
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error { | ||
res := make(chan error, 1) | ||
select { | ||
case dht.triggerRtRefresh <- struct{}{}: | ||
case dht.triggerRtRefresh <- res: | ||
default: | ||
} | ||
return res | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be just
chan error
?