Skip to content

Commit

Permalink
Merge pull request #50 from libp2p/feat/283
Browse files Browse the repository at this point in the history
Disassociate RT membership from connectivity
  • Loading branch information
aarshkshah1992 authored Feb 26, 2020
2 parents 36938b1 + 5e06659 commit 99fef9c
Show file tree
Hide file tree
Showing 9 changed files with 802 additions and 156 deletions.
103 changes: 57 additions & 46 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,100 +4,111 @@ package kbucket

import (
"container/list"
"sync"

"github.com/libp2p/go-libp2p-core/peer"
)

// Bucket holds a list of peers.
type Bucket struct {
lk sync.RWMutex
// PeerState is the state of the peer as seen by the Routing Table.
type PeerState int

const (
// PeerStateActive indicates that we know the peer is active/alive.
PeerStateActive PeerState = iota
// PeerStateMissing indicates that we do not know the state of the peer.
PeerStateMissing
)

// PeerInfo holds all related information for a peer in the K-Bucket.
type PeerInfo struct {
Id peer.ID
State PeerState
}

// bucket holds a list of peers.
// we synchronize on the Routing Table lock for all access to the bucket
// and so do not need any locks in the bucket.
// if we want/need to avoid locking the table for accessing a bucket in the future,
// it WILL be the caller's responsibility to synchronize all access to a bucket.
type bucket struct {
list *list.List
}

func newBucket() *Bucket {
b := new(Bucket)
func newBucket() *bucket {
b := new(bucket)
b.list = list.New()
return b
}

func (b *Bucket) Peers() []peer.ID {
b.lk.RLock()
defer b.lk.RUnlock()
// returns all peers in the bucket
// it is safe for the caller to modify the returned objects as it is a defensive copy
func (b *bucket) peers() []PeerInfo {
var ps []PeerInfo
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*PeerInfo)
ps = append(ps, *p)
}
return ps
}

// return the Ids of all the peers in the bucket.
func (b *bucket) peerIds() []peer.ID {
ps := make([]peer.ID, 0, b.list.Len())
for e := b.list.Front(); e != nil; e = e.Next() {
id := e.Value.(peer.ID)
ps = append(ps, id)
p := e.Value.(*PeerInfo)
ps = append(ps, p.Id)
}
return ps
}

func (b *Bucket) Has(id peer.ID) bool {
b.lk.RLock()
defer b.lk.RUnlock()
// returns the peer with the given Id if it exists
// returns nil if the peerId does not exist
func (b *bucket) getPeer(p peer.ID) *PeerInfo {
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
return true
if e.Value.(*PeerInfo).Id == p {
return e.Value.(*PeerInfo)
}
}
return false
return nil
}

func (b *Bucket) Remove(id peer.ID) bool {
b.lk.Lock()
defer b.lk.Unlock()
// removes the peer with the given Id from the bucket.
// returns true if successful, false otherwise.
func (b *bucket) remove(id peer.ID) bool {
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
if e.Value.(*PeerInfo).Id == id {
b.list.Remove(e)
return true
}
}
return false
}

func (b *Bucket) MoveToFront(id peer.ID) {
b.lk.Lock()
defer b.lk.Unlock()
func (b *bucket) moveToFront(id peer.ID) {

for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
if e.Value.(*PeerInfo).Id == id {
b.list.MoveToFront(e)
}
}
}

func (b *Bucket) PushFront(p peer.ID) {
b.lk.Lock()
func (b *bucket) pushFront(p *PeerInfo) {
b.list.PushFront(p)
b.lk.Unlock()
}

func (b *Bucket) PopBack() peer.ID {
b.lk.Lock()
defer b.lk.Unlock()
last := b.list.Back()
b.list.Remove(last)
return last.Value.(peer.ID)
}

func (b *Bucket) Len() int {
b.lk.RLock()
defer b.lk.RUnlock()
func (b *bucket) len() int {
return b.list.Len()
}

// Split splits a buckets peers into two buckets, the methods receiver will have
// splits a buckets peers into two buckets, the methods receiver will have
// peers with CPL equal to cpl, the returned bucket will have peers with CPL
// greater than cpl (returned bucket has closer peers)
func (b *Bucket) Split(cpl int, target ID) *Bucket {
b.lk.Lock()
defer b.lk.Unlock()

func (b *bucket) split(cpl int, target ID) *bucket {
out := list.New()
newbuck := newBucket()
newbuck.list = out
e := b.list.Front()
for e != nil {
peerID := ConvertPeerID(e.Value.(peer.ID))
peerID := ConvertPeerID(e.Value.(*PeerInfo).Id)
peerCPL := CommonPrefixLen(peerID, target)
if peerCPL > cpl {
cur := e
Expand Down
97 changes: 97 additions & 0 deletions cpl_replacement_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kbucket

import (
"sync"

"github.com/libp2p/go-libp2p-core/peer"
)

// TODO Should ideally use a Circular queue for this
// maintains a bounded, de-duplicated and FIFO peer candidate queue for each Cpl
type cplReplacementCache struct {
localPeer ID
maxQueueSize int

sync.Mutex
candidates map[uint][]peer.ID // candidates for a Cpl
}

func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache {
return &cplReplacementCache{
localPeer: localPeer,
maxQueueSize: maxQueueSize,
candidates: make(map[uint][]peer.ID),
}
}

// pushes a candidate to the end of the queue for the corresponding Cpl
// returns false if the queue is full or it already has the peer
// returns true if was successfully added
func (c *cplReplacementCache) push(p peer.ID) bool {
c.Lock()
defer c.Unlock()

cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p)))

// queue is full
if len(c.candidates[cpl]) >= c.maxQueueSize {
return false
}
// queue already has the peer
for _, pr := range c.candidates[cpl] {
if pr == p {
return false
}
}

// push
c.candidates[cpl] = append(c.candidates[cpl], p)
return true
}

// pops a candidate from the top of the candidate queue for the given Cpl
// returns false if the queue is empty
// returns the peerId and true if successful
func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) {
c.Lock()
defer c.Unlock()

if len(c.candidates[cpl]) != 0 {
p := c.candidates[cpl][0]
c.candidates[cpl] = c.candidates[cpl][1:]

// delete the queue if it's empty
if len(c.candidates[cpl]) == 0 {
delete(c.candidates, cpl)
}

return p, true
}
return "", false
}

// removes a given peer if it's present
// returns false if the peer is absent
func (c *cplReplacementCache) remove(p peer.ID) bool {
c.Lock()
defer c.Unlock()

cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p)))

if len(c.candidates[cpl]) != 0 {
// remove the peer if it's present
for i, pr := range c.candidates[cpl] {
if pr == p {
c.candidates[cpl] = append(c.candidates[cpl][:i], c.candidates[cpl][i+1:]...)
}
}

// remove the queue if it's empty
if len(c.candidates[cpl]) == 0 {
delete(c.candidates, cpl)
}

return true
}
return false
}
87 changes: 87 additions & 0 deletions cpl_replacement_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package kbucket

import (
"testing"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"

"github.com/stretchr/testify/require"
)

func TestCplReplacementCache(t *testing.T) {
t.Parallel()

maxQSize := 2
local := ConvertPeerID(test.RandPeerIDFatal(t))
c := newCplReplacementCache(local, maxQSize)

// pop an empty queue fails
p, b := c.pop(1)
require.Empty(t, p)
require.False(t, b)

// push two elements to an empty queue works
testPeer1 := genPeer(t, local, 1)
testPeer2 := genPeer(t, local, 1)

// pushing first peer works
require.True(t, c.push(testPeer1))
// pushing a duplicate fails
require.False(t, c.push(testPeer1))
// pushing another peers works
require.True(t, c.push(testPeer2))

// popping the above pushes works
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer1, p)
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer2, p)

// pushing & popping again works
require.True(t, c.push(testPeer1))
require.True(t, c.push(testPeer2))
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer1, p)
p, b = c.pop(1)
require.True(t, b)
require.Equal(t, testPeer2, p)

// fill up a queue
p1 := genPeer(t, local, 2)
p2 := genPeer(t, local, 2)
require.True(t, c.push(p1))
require.True(t, c.push(p2))

// push should not work on a full queue
p3 := genPeer(t, local, 2)
require.False(t, c.push(p3))

// remove a peer & verify it's been removed
p4 := genPeer(t, local, 0)
require.True(t, c.push(p4))

c.Lock()
require.Len(t, c.candidates[0], 1)
require.True(t, c.candidates[0][0] == p4)
c.Unlock()

require.True(t, c.remove(p4))

c.Lock()
require.Len(t, c.candidates[0], 0)
c.Unlock()
}

func genPeer(t *testing.T, local ID, cpl int) peer.ID {
var p peer.ID
for {
p = test.RandPeerIDFatal(t)
if CommonPrefixLen(local, ConvertPeerID(p)) == cpl {
return p
}
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ module github.com/libp2p/go-libp2p-kbucket
require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/minio/sha256-simd v0.1.1
github.com/multiformats/go-multihash v0.0.13
github.com/stretchr/testify v1.4.0
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30
)

go 1.13
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -204,6 +206,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8=
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
Expand Down
Loading

0 comments on commit 99fef9c

Please sign in to comment.