Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refresh and wait #418

Merged
merged 2 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type IpfsDHT struct {
autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
triggerRtRefresh chan struct{}
triggerRtRefresh chan chan<- error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be just chan error?


maxRecordAge time.Duration
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerRtRefresh: make(chan struct{}),
triggerRtRefresh: make(chan chan<- error),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is slightly stricter.

}

dht.ctx = dht.newContextWithLocalTags(ctx)
Expand Down
69 changes: 56 additions & 13 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package dht

import (
"context"
"fmt"
"time"

multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
Expand Down Expand Up @@ -59,27 +61,57 @@ func (dht *IpfsDHT) startRefreshing() error {
}

for {
var waiting []chan<- error
select {
case <-refreshTicker.C:
case <-dht.triggerRtRefresh:
logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size())
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}
dht.doRefresh(ctx)

// Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting:
for {
select {
case res := <-dht.triggerRtRefresh:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien

Is this second select here to handle the case where multiple callers might ask for a refresh "simultaneously" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'll document that.

if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}

err := dht.doRefresh(ctx)
for _, w := range waiting {
w <- err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(well, if we're returning a channel, we should probably close it)

close(w)
}
if err != nil {
logger.Warning(err)
}
}
})

return nil
}

func (dht *IpfsDHT) doRefresh(ctx context.Context) {
dht.selfWalk(ctx)
dht.refreshBuckets(ctx)
func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)
}
if err := dht.refreshBuckets(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}

// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
Expand All @@ -103,6 +135,8 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
// 16 bits specified anyways.
buckets = buckets[:16]
}

var merr error
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
continue
Expand All @@ -120,20 +154,24 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
}

if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
logger.Warningf("failed to do a random walk on bucket %d: %s", bucketID, err)
merr = multierror.Append(
merr,
fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err),
)
}
}
return merr
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return
return nil
}
logger.Warningf("failed to query self during routing table refresh: %s", err)
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
Expand All @@ -146,9 +184,14 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
}

// RefreshRoutingTable tells the DHT to refresh it's routing tables.
func (dht *IpfsDHT) RefreshRoutingTable() {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
res := make(chan error, 1)
select {
case dht.triggerRtRefresh <- struct{}{}:
case dht.triggerRtRefresh <- res:
default:
}
return res
}
24 changes: 16 additions & 8 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,14 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.RefreshRoutingTable()
select {
case err := <-dht.RefreshRoutingTable():
if err != nil {
t.Error(err)
}
case <-ctx.Done():
return
}
}
}

Expand Down Expand Up @@ -663,25 +670,26 @@ func TestRefresh(t *testing.T) {

<-time.After(100 * time.Millisecond)
// bootstrap a few times until we get good tables.
stop := make(chan struct{})
t.Logf("bootstrapping them so they find each other %d", nDHTs)
ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second)
defer cancelT()

go func() {
for {
t.Logf("bootstrapping them so they find each other %d", nDHTs)
ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
for ctxT.Err() == nil {
bootstrap(t, ctxT, dhts)

// wait a bit.
select {
case <-time.After(50 * time.Millisecond):
continue // being explicit
case <-stop:
case <-ctxT.Done():
return
}
}
}()

waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
close(stop)
cancelT()

if u.Debug {
// the routing tables should be full now. let's inspect them.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.12

require (
github.com/gogo/protobuf v1.3.1
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.3
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.3.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyF
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down
4 changes: 2 additions & 2 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
dht.Update(dht.Context(), p)
if refresh && dht.autoRefresh {
select {
case dht.triggerRtRefresh <- struct{}{}:
case dht.triggerRtRefresh <- nil:
default:
}
}
Expand Down Expand Up @@ -82,7 +82,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
dht.Update(dht.Context(), p)
if refresh && dht.autoRefresh {
select {
case dht.triggerRtRefresh <- struct{}{}:
case dht.triggerRtRefresh <- nil:
default:
}
}
Expand Down