Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a few small changes to make the dht more efficient #2841

Merged
merged 2 commits into from
Jun 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,6 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
return nil
}

// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) error {

// add self as the provider
pi := pstore.PeerInfo{
ID: dht.self,
Addrs: dht.host.Addrs(),
}

// // only share WAN-friendly addresses ??
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
if len(pi.Addrs) < 1 {
// log.Infof("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, key.Key(key), pi.Addrs)
return fmt.Errorf("no known addresses for self. cannot put provider.")
}

pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey, 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
err := dht.sendMessage(ctx, p, pmes)
if err != nil {
return err
}

log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, key.Key(skey), pi.Addrs)
return nil
}

var errInvalidRecord = errors.New("received invalid record")

// getValueOrPeers queries a particular peer p for the value for
Expand Down
24 changes: 23 additions & 1 deletion routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"bytes"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -243,13 +244,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
return err
}

mes, err := dht.makeProvRecord(key)
if err != nil {
return err
}

wg := sync.WaitGroup{}
for p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
log.Debugf("putProvider(%s, %s)", key, p)
err := dht.putProvider(ctx, p, string(key))
err := dht.sendMessage(ctx, p, mes)
if err != nil {
log.Debug(err)
}
Expand All @@ -258,6 +264,22 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
wg.Wait()
return nil
}
func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
pi := pstore.PeerInfo{
ID: dht.self,
Addrs: dht.host.Addrs(),
}

// // only share WAN-friendly addresses ??
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
if len(pi.Addrs) < 1 {
return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
}

pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(skey), 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
return pmes, nil
}

// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]pstore.PeerInfo, error) {
Expand Down
9 changes: 5 additions & 4 deletions routing/kbucket/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m pstore
// Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned
func (rt *RoutingTable) Update(p peer.ID) {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peerID := ConvertPeerID(p)
cpl := commonPrefixLen(peerID, rt.local)

rt.tabLock.Lock()
defer rt.tabLock.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this no longer protects rt.local, which is fine. BUT it would be nice if the RoutingTable struct was more clear about what the mutex protects.

now

// RoutingTable defines the routing table.
type RoutingTable struct {

    // ID of the local peer
    local ID

    // Blanket lock, refine later for better performance
    tabLock sync.RWMutex

    // latency metrics
    metrics pstore.Metrics

    // Maximum acceptable latency for peers in this cluster
    maxLatency time.Duration

    // kBuckets define all the fingers to other nodes.
    Buckets    []*Bucket
    bucketsize int
}

better

// RoutingTable defines the routing table.
type RoutingTable struct {

    local      ID  // ID of the local peer
    bucketsize int

    // the following fields are all protected by tabLock
    tabLock    sync.RWMutex   // Blanket lock, refine later for better performance
    metrics    pstore.Metrics // latency metrics
    maxLatency time.Duration  // Maximum acceptable latency for peers in this cluster
    Buckets    []*Bucket      // kBuckets define all the fingers to other nodes.
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or at least say:

    // Blanket lock, refine later for better performance
    // protects metrics, maxLatency, and Buckets
    tabLock sync.RWMutex

bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
Expand Down Expand Up @@ -144,10 +144,10 @@ func (rt *RoutingTable) NearestPeer(id ID) peer.ID {

// NearestPeers returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
cpl := commonPrefixLen(id, rt.local)

rt.tabLock.RLock()

// Get bucket at cpl index or last bucket
var bucket *Bucket
if cpl >= len(rt.Buckets) {
Expand All @@ -170,6 +170,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
peerArr = copyPeersFromList(id, peerArr, plist)
}
}
rt.tabLock.RUnlock()

// Sort by distance to local peer
sort.Sort(peerArr)
Expand Down