Skip to content

Commit

Permalink
added check to avoid adding unresponsive dht peers to the dht routing…
Browse files Browse the repository at this point in the history
… table
  • Loading branch information
guillaumemichel committed Feb 21, 2023
1 parent 1e1e776 commit 5362e55
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 55 deletions.
105 changes: 59 additions & 46 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var (
baseLogger = logger.Desugar()

rtFreezeTimeout = 1 * time.Minute

// max time given to connecting to, and querying a peers before
// adding it to the Routing Table
protocolCheckTimeout = 10 * time.Second
)

const (
Expand All @@ -68,11 +72,6 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -125,6 +124,10 @@ type IpfsDHT struct {

autoRefresh bool

// A function performing a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
protocolCheck func(context.Context, peer.ID) error

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
Expand All @@ -140,7 +143,7 @@ type IpfsDHT struct {
disableFixLowPeers bool
fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
addPeerToRTChan chan peer.ID
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
Expand Down Expand Up @@ -233,7 +236,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(dht.ctx, p)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -294,7 +297,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
addPeerToRTChan: make(chan peer.ID),
refreshFinishedCh: make(chan struct{}),
}

Expand All @@ -321,6 +324,17 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

dht.protocolCheck = func(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least 2 peer.ID, itself plus at least one neighbor
// otherwise the response is considered as invalid
if err == nil && len(peerids) < 2 {
return fmt.Errorf("peer %s failed to return its closest peers", p)
}
return err
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
Expand Down Expand Up @@ -363,16 +377,11 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
_, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
dht.protocolCheck,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down Expand Up @@ -480,7 +489,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(ctx, p, false)
dht.peerFound(ctx, p)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -591,22 +600,23 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
case p := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
Expand All @@ -626,39 +636,42 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
//
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=0
//
// If we connect to a peer and then exchange a query RPC ->
//
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
//
// If we query a peer we already have in our Routing Table ->
//
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
//
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}
// peerFound verifies whether the found peer advertises DHT protocols
// and probe it to make sure it answers DHT queries as expected. If
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
livelinessCtx, cancel := context.WithTimeout(ctx, protocolCheckTimeout)
defer cancel()

if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: p}); err != nil {
logger.Debugw("failed connection to DHT peer", "peer", p, "error", err)
return
}

if err := dht.protocolCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
return
}

dht.validPeerFound(ctx, p)
}
}

// validPeerFound signals the routingTable that we've found a peer that
// supports the DHT protocol, and just answered correctly to a DHT FindPeers
func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}

select {
case dht.addPeerToRTChan <- p:
case <-dht.ctx.Done():
return
}
}

Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)
dht.peerFound(dht.ctx, mPeer)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
6 changes: 3 additions & 3 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestNotFound(t *testing.T) {
}

for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
d.peerFound(ctx, p.ID())
}

// long timeout to ensure timing is not at play.
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestLessThanKResponses(t *testing.T) {
}

for i := 1; i < 5; i++ {
d.peerFound(ctx, hosts[i].ID(), true)
d.peerFound(ctx, hosts[i].ID())
}

// Reply with random peers to every message
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}

d.peerFound(ctx, hosts[1].ID(), true)
d.peerFound(ctx, hosts[1].ID())

for _, proto := range d.serverProtocols {
// It would be nice to be able to just get a value and succeed but then
Expand Down
2 changes: 1 addition & 1 deletion handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BenchmarkHandleFindPeer(b *testing.B) {
panic(err)
}

d.peerFound(ctx, id, true)
d.peerFound(ctx, id)

peers = append(peers, id)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i))
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
queryDuration := time.Since(startQuery)

// query successful, try to add to RT
q.dht.peerFound(q.dht.ctx, p, true)
q.dht.validPeerFound(q.dht.ctx, p)

// process new peers
saw := []peer.ID{}
Expand Down
2 changes: 1 addition & 1 deletion subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
} else if valid {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(dht.ctx, p)
dht.fixRTIfNeeded()
} else {
dht.peerStoppedDHT(dht.ctx, p)
Expand Down

0 comments on commit 5362e55

Please sign in to comment.