Skip to content

Commit

Permalink
New RT management policy (#520)
Browse files Browse the repository at this point in the history
* new RT management policy
  • Loading branch information
aarshkshah1992 authored Apr 2, 2020
1 parent bce52ae commit f79ad79
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 260 deletions.
121 changes: 79 additions & 42 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

Expand All @@ -15,23 +16,22 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"

"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -96,7 +96,7 @@ type IpfsDHT struct {

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
rtRefreshInterval time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

Expand All @@ -119,6 +119,9 @@ var (
)

// New creates a new DHT with the specified host and options.
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
Expand All @@ -134,7 +137,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}

dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
dht.rtRefreshInterval = cfg.routingTable.refreshInterval
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout

dht.maxRecordAge = cfg.maxRecordAge
Expand Down Expand Up @@ -174,6 +177,10 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)

dht.startSelfLookup()
dht.startRefreshing()

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
go dht.persistRTPeersInPeerStore()
return dht, nil
}

Expand Down Expand Up @@ -253,32 +260,17 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}

func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())
// construct the routing table with a peer validation function
pvF := func(c context.Context, p peer.ID) bool {
// connect should work
if err := dht.host.Connect(c, peer.AddrInfo{ID: p}); err != nil {
rtPvLogger.Infof("failed to connect to peer %s for validation, err=%s", p, err)
return false
}

// peer should support the DHT protocol
b, err := dht.validRTPeer(p)
if err != nil {
rtPvLogger.Errorf("failed to check if peer %s supports DHT protocol, err=%s", p, err)
return false
}

return b && cfg.routingTable.peerFilter(dht, dht.Host().Network().ConnsToPeer(p))
}
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold := l1 / l2 * float64(cfg.routingTable.refreshInterval)

rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
if !(cfg.routingTable.checkInterval == 0) {
rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval))
}
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(),
rtOpts...)
rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -292,6 +284,25 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
return rt, err
}

// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
defer tickr.Stop()

for {
select {
case <-tickr.C:
ps := dht.routingTable.ListPeers()
for _, p := range ps {
dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
}
case <-dht.ctx.Done():
return
}
}
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
Expand Down Expand Up @@ -404,24 +415,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
}

// peerFound signals the routingTable that we've found a peer that
// supports the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerFound", p)
dht.routingTable.HandlePeerAlive(p)
// might support the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorf("failed to validate if peer is a DHT peer, err=%s", err)
} else if b {
logger.Event(ctx, "peerFound", p)
dht.routingTable.TryAddPeer(p, queryPeer)

// If we discovered the peer because of a query, we need to ensure we override the "zero" lastSuccessfulOutboundQuery
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
if queryPeer {
dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
}
}
}

// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerStoppedDHT", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.HandlePeerDead(p)
dht.routingTable.RemovePeer(p)

// since we lost a peer from the RT, we should do this here
dht.fixLowPeers()
}

// peerDisconnected signals the routing table that a peer is not connected anymore.
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeers() {
if dht.routingTable.Size() > minRTRefreshThreshold {
return
}

for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.Context(), p, false)
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
}
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Expand Down
9 changes: 2 additions & 7 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (dht *IpfsDHT) startRefreshing() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

refreshTicker := time.NewTicker(dht.rtRefreshPeriod)
refreshTicker := time.NewTicker(dht.rtRefreshInterval)
defer refreshTicker.Stop()

// refresh if option is set
Expand Down Expand Up @@ -188,12 +188,7 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {

var merr error
for _, tcpl := range trackedCpls {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}

// do not refresh if bucket is full
if dht.routingTable.IsBucketFull(tcpl.Cpl) {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshInterval {
continue
}

Expand Down
16 changes: 3 additions & 13 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"

Expand Down Expand Up @@ -141,6 +140,9 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)

resp, err := handler(ctx, mPeer, &req)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Expand Down Expand Up @@ -173,9 +175,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
Expand All @@ -187,9 +186,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
Expand All @@ -213,9 +209,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
Expand All @@ -224,9 +217,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
Expand Down
16 changes: 4 additions & 12 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type config struct {

routingTable struct {
refreshQueryTimeout time.Duration
refreshPeriod time.Duration
refreshInterval time.Duration
autoRefresh bool
latencyTolerance time.Duration
checkInterval time.Duration
Expand Down Expand Up @@ -88,8 +88,8 @@ var defaults = func(o *config) error {
o.queryPeerFilter = emptyQueryFilter

o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 30 * time.Second
o.routingTable.refreshPeriod = 10 * time.Minute
o.routingTable.refreshQueryTimeout = 1 * time.Minute
o.routingTable.refreshInterval = 10 * time.Minute
o.routingTable.autoRefresh = true
o.routingTable.peerFilter = emptyRTFilter
o.maxRecordAge = time.Hour * 36
Expand Down Expand Up @@ -122,14 +122,6 @@ func (c *config) validate() error {
return nil
}

// RoutingTableCheckInterval is the interval between two runs of the RT cleanup routine.
func RoutingTableCheckInterval(i time.Duration) Option {
return func(c *config) error {
c.routingTable.checkInterval = i
return nil
}
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Expand All @@ -156,7 +148,7 @@ func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
// the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
return func(c *config) error {
c.routingTable.refreshPeriod = period
c.routingTable.refreshInterval = period
return nil
}
}
Expand Down
Loading

0 comments on commit f79ad79

Please sign in to comment.