Skip to content

Commit

Permalink
disassociate RT membership from connectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Feb 14, 2020
1 parent 36938b1 commit 32e8f8f
Show file tree
Hide file tree
Showing 9 changed files with 755 additions and 113 deletions.
95 changes: 69 additions & 26 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,86 +9,129 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// Bucket holds a list of peers.
type Bucket struct {
// PeerState is the state of the peer as seen by the Routing Table.
type PeerState int

const (
// PeerStateActive indicates that we know the peer is active/alive.
PeerStateActive PeerState = iota
// PeerStateMissing indicates that we do not know the state of the peer.
PeerStateMissing
)

type peerInfo struct {
Id peer.ID
State PeerState
}

// bucket holds a list of peers.
type bucket struct {
lk sync.RWMutex
list *list.List
}

func newBucket() *Bucket {
b := new(Bucket)
func newBucket() *bucket {
b := new(bucket)
b.list = list.New()
return b
}

func (b *Bucket) Peers() []peer.ID {
// filters the peers using the given predicate
// caller SHOULD NOT modify the returned peer objects
func (b *bucket) filter(f func(peerInfo) bool) []*peerInfo {
b.lk.RLock()
defer b.lk.RUnlock()
var ps []*peerInfo
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
if f(*p) {
ps = append(ps, p)
}
}
return ps
}

// return the Ids of all the peers in the bucket.
func (b *bucket) peerIds() []peer.ID {
b.lk.RLock()
defer b.lk.RUnlock()
ps := make([]peer.ID, 0, b.list.Len())
for e := b.list.Front(); e != nil; e = e.Next() {
id := e.Value.(peer.ID)
ps = append(ps, id)
p := e.Value.(*peerInfo)
ps = append(ps, p.Id)
}
return ps
}

func (b *Bucket) Has(id peer.ID) bool {
// returns the peer with the given Id or nil if not found.
// caller SHOULD NOT modify the returned peer object.
func (b *bucket) getPeer(p peer.ID) *peerInfo {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
if e.Value.(*peerInfo).Id == p {
return e.Value.(*peerInfo)
}
}
return nil
}

// replaces the peer based on the Id.
// returns true if the replace was successful, false otherwise.
func (b *bucket) replace(p *peerInfo) bool {
b.lk.Lock()
defer b.lk.Unlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == p.Id {
b.list.Remove(e)
b.list.PushBack(p)
return true
}
}
return false
}

func (b *Bucket) Remove(id peer.ID) bool {
// removes the peer with the given Id from the bucket.
// returns true if successful, false otherwise.
func (b *bucket) remove(id peer.ID) bool {
b.lk.Lock()
defer b.lk.Unlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
if e.Value.(*peerInfo).Id == id {
b.list.Remove(e)
return true
}
}
return false
}

func (b *Bucket) MoveToFront(id peer.ID) {
func (b *bucket) moveToFront(id peer.ID) {
b.lk.Lock()
defer b.lk.Unlock()

for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
if e.Value.(*peerInfo).Id == id {
b.list.MoveToFront(e)
}
}
}

func (b *Bucket) PushFront(p peer.ID) {
func (b *bucket) pushFront(p *peerInfo) {
b.lk.Lock()
b.list.PushFront(p)
b.lk.Unlock()
}

func (b *Bucket) PopBack() peer.ID {
b.lk.Lock()
defer b.lk.Unlock()
last := b.list.Back()
b.list.Remove(last)
return last.Value.(peer.ID)
}

func (b *Bucket) Len() int {
func (b *bucket) len() int {
b.lk.RLock()
defer b.lk.RUnlock()
return b.list.Len()
}

// Split splits a buckets peers into two buckets, the methods receiver will have
// splits a buckets peers into two buckets, the methods receiver will have
// peers with CPL equal to cpl, the returned bucket will have peers with CPL
// greater than cpl (returned bucket has closer peers)
func (b *Bucket) Split(cpl int, target ID) *Bucket {
func (b *bucket) split(cpl int, target ID) *bucket {
b.lk.Lock()
defer b.lk.Unlock()

Expand All @@ -97,7 +140,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
newbuck.list = out
e := b.list.Front()
for e != nil {
peerID := ConvertPeerID(e.Value.(peer.ID))
peerID := ConvertPeerID(e.Value.(*peerInfo).Id)
peerCPL := CommonPrefixLen(peerID, target)
if peerCPL > cpl {
cur := e
Expand Down
98 changes: 98 additions & 0 deletions cpl_replacement_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package kbucket

import (
"sync"

"github.com/libp2p/go-libp2p-core/peer"

"github.com/wangjia184/sortedset"
)

// TODO Should ideally use a Circular queue for this
// maintains a bounded, de-duplicated and FIFO peer candidate queue for each Cpl
type cplReplacementCache struct {
localPeer ID
maxQueueSize int

sync.Mutex
candidates map[uint]*sortedset.SortedSet // candidates for a Cpl
}

func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache {
return &cplReplacementCache{
localPeer: localPeer,
maxQueueSize: maxQueueSize,
candidates: make(map[uint]*sortedset.SortedSet),
}
}

// pushes a candidate to the end of the queue for the corresponding Cpl
// returns false if the queue is full or it already has the peer
// returns true if was successfully added
func (c *cplReplacementCache) push(p peer.ID) bool {
c.Lock()
defer c.Unlock()

// create queue if not created
cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p)))
if c.candidates[cpl] == nil {
c.candidates[cpl] = sortedset.New()
}

q := c.candidates[cpl]

// queue is full
if (q.GetCount()) >= c.maxQueueSize {
return false
}
// queue already has the peer
if q.GetByKey(string(p)) != nil {
return false
}

// push
q.AddOrUpdate(string(p), sortedset.SCORE(q.GetCount()+1), nil)
return true
}

// pops a candidate from the top of the candidate queue for the given Cpl
// returns false if the queue is empty
// returns the peerId and true if successful
func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) {
c.Lock()
c.Unlock()

q := c.candidates[cpl]
if q != nil && q.GetCount() > 0 {
n := q.PopMin()

// delete the queue if it's empty
if q.GetCount() == 0 {
delete(c.candidates, cpl)
}

return peer.ID(n.Key()), true
}
return "", false
}

// removes a given peer if it's present
// returns false if the peer is absent
func (c *cplReplacementCache) remove(p peer.ID) bool {
c.Lock()
defer c.Unlock()

cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p)))
q := c.candidates[cpl]
if q != nil {
q.Remove(string(p))

// remove the queue if it's empty
if q.GetCount() == 0 {
delete(c.candidates, cpl)
}

return true
}
return false
}
82 changes: 82 additions & 0 deletions cpl_replacement_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package kbucket

import (
"testing"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"

"github.com/stretchr/testify/require"
)

func TestCandidateQueue(t *testing.T) {
t.Parallel()

maxQSize := 2
local := ConvertPeerID(test.RandPeerIDFatal(t))
c := newCplReplacementCache(local, maxQSize)

// pop an empty queue fails
p, b := c.pop(1)
require.Empty(t, p)
require.False(t, b)

// push two elements to an empty queue works
testPeer1 := genPeer(t, local, 1)
testPeer2 := genPeer(t, local, 1)

// pushing first peer works
require.True(t, c.push(testPeer1))
// pushing a duplicate fails
require.False(t, c.push(testPeer1))
// pushing another peers works
require.True(t, c.push(testPeer2))

// popping the above pushes works
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer1, p)
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer2, p)

// pushing & popping again works
require.True(t, c.push(testPeer1))
require.True(t, c.push(testPeer2))
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer1, p)
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer2, p)

// fill up a queue
p1 := genPeer(t, local, 2)
p2 := genPeer(t, local, 2)
require.True(t, c.push(p1))
require.True(t, c.push(p2))

// push should not work on a full queue
p3 := genPeer(t, local, 2)
require.False(t, c.push(p3))

// remove a peer & verify it's been removed
require.NotNil(t, c.candidates[2].GetByKey(string(p2)))
require.True(t, c.remove(p2))
c.Lock()
require.Nil(t, c.candidates[2].GetByKey(string(p2)))
c.Unlock()

// now push should work
require.True(t, c.push(p3))
}

func genPeer(t *testing.T, local ID, cpl int) peer.ID {
var p peer.ID
for {
p = test.RandPeerIDFatal(t)
if CommonPrefixLen(local, ConvertPeerID(p)) == cpl {
return p
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/minio/sha256-simd v0.1.1
github.com/multiformats/go-multihash v0.0.13
github.com/stretchr/testify v1.4.0
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30
)

go 1.13
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -204,6 +206,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8=
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
Expand Down
Loading

0 comments on commit 32e8f8f

Please sign in to comment.