Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New RT management policy #520

Merged
merged 12 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 76 additions & 42 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

Expand All @@ -15,23 +16,22 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"

"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -96,7 +96,7 @@ type IpfsDHT struct {

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
rtRefreshInterval time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

Expand All @@ -119,6 +119,9 @@ var (
)

// New creates a new DHT with the specified host and options.
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
Expand All @@ -134,7 +137,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}

dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
dht.rtRefreshInterval = cfg.routingTable.refreshInterval
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout

dht.maxRecordAge = cfg.maxRecordAge
Expand Down Expand Up @@ -174,6 +177,10 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)

dht.startSelfLookup()
dht.startRefreshing()

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
go dht.persistRTPeersInPeerStore()
return dht, nil
}

Expand Down Expand Up @@ -253,32 +260,17 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}

func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())
// construct the routing table with a peer validation function
pvF := func(c context.Context, p peer.ID) bool {
// connect should work
if err := dht.host.Connect(c, peer.AddrInfo{ID: p}); err != nil {
rtPvLogger.Infof("failed to connect to peer %s for validation, err=%s", p, err)
return false
}

// peer should support the DHT protocol
b, err := dht.validRTPeer(p)
if err != nil {
rtPvLogger.Errorf("failed to check if peer %s supports DHT protocol, err=%s", p, err)
return false
}

return b && cfg.routingTable.peerFilter(dht, dht.Host().Network().ConnsToPeer(p))
}
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold := l1 / l2 * float64(cfg.routingTable.refreshInterval)

rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
if !(cfg.routingTable.checkInterval == 0) {
rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval))
}
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(),
rtOpts...)
rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -292,6 +284,25 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
return rt, err
}

// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
defer tickr.Stop()

for {
select {
case <-tickr.C:
ps := dht.routingTable.ListPeers()
for _, p := range ps {
dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
}
case <-dht.ctx.Done():
return
}
}
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
Expand Down Expand Up @@ -404,24 +415,47 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
}

// peerFound signals the routingTable that we've found a peer that
// supports the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerFound", p)
dht.routingTable.HandlePeerAlive(p)
// might support the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
b, err := dht.validRTPeer(p)
if err != nil {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf("failed to validate if peer is a DHT peer, err=%s", err)
} else if b {
logger.Event(ctx, "peerFound", p)
dht.routingTable.TryAddPeer(p, queryPeer)
if queryPeer {
dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
}

// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerStoppedDHT", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.HandlePeerDead(p)
dht.routingTable.RemovePeer(p)

// since we lost a peer from the RT, we should do this here
dht.fixLowPeers()
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}

// peerDisconnected signals the routing table that a peer is not connected anymore.
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeers() {
if dht.routingTable.Size() > minRTRefreshThreshold {
return
}

for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.Context(), p, false)
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
}
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Expand Down
9 changes: 2 additions & 7 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (dht *IpfsDHT) startRefreshing() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

refreshTicker := time.NewTicker(dht.rtRefreshPeriod)
refreshTicker := time.NewTicker(dht.rtRefreshInterval)
defer refreshTicker.Stop()

// refresh if option is set
Expand Down Expand Up @@ -188,12 +188,7 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {

var merr error
for _, tcpl := range trackedCpls {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}

// do not refresh if bucket is full
if dht.routingTable.IsBucketFull(tcpl.Cpl) {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshInterval {
continue
}

Expand Down
16 changes: 3 additions & 13 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"

Expand Down Expand Up @@ -141,6 +140,9 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)

resp, err := handler(ctx, mPeer, &req)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Expand Down Expand Up @@ -173,9 +175,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
Expand All @@ -187,9 +186,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
Expand All @@ -213,9 +209,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
Expand All @@ -224,9 +217,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
Expand Down
16 changes: 4 additions & 12 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type config struct {

routingTable struct {
refreshQueryTimeout time.Duration
refreshPeriod time.Duration
refreshInterval time.Duration
autoRefresh bool
latencyTolerance time.Duration
checkInterval time.Duration
Expand Down Expand Up @@ -88,8 +88,8 @@ var defaults = func(o *config) error {
o.queryPeerFilter = emptyQueryFilter

o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 30 * time.Second
o.routingTable.refreshPeriod = 10 * time.Minute
o.routingTable.refreshQueryTimeout = 1 * time.Minute
o.routingTable.refreshInterval = 10 * time.Minute
o.routingTable.autoRefresh = true
o.routingTable.peerFilter = emptyRTFilter
o.maxRecordAge = time.Hour * 36
Expand Down Expand Up @@ -122,14 +122,6 @@ func (c *config) validate() error {
return nil
}

// RoutingTableCheckInterval is the interval between two runs of the RT cleanup routine.
func RoutingTableCheckInterval(i time.Duration) Option {
return func(c *config) error {
c.routingTable.checkInterval = i
return nil
}
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Expand All @@ -156,7 +148,7 @@ func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
// the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
return func(c *config) error {
c.routingTable.refreshPeriod = period
c.routingTable.refreshInterval = period
return nil
}
}
Expand Down
Loading