Skip to content

Commit

Permalink
bitswap/server/internal/decision: rewrite ledger inversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Feb 17, 2023
1 parent d195f6d commit 9cb5cb5
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 163 deletions.
3 changes: 3 additions & 0 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ const (
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256

// Maximum size of the wantlist we are willing to keep in memory.
MaxQueuedWantlistEntiresPerPeer = 1024
)
4 changes: 4 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func MaxOutstandingBytesPerPeer(count int) Option {
return Option{server.MaxOutstandingBytesPerPeer(count)}
}

func MaxQueuedWantlistEntriesPerPeer(count uint) Option {
return Option{server.MaxQueuedWantlistEntriesPerPeer(count)}
}

func TaskWorkerCount(count int) Option {
return Option{server.TaskWorkerCount(count)}
}
Expand Down
183 changes: 48 additions & 135 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package decision
import (
"context"
"fmt"
"math/bits"
"sync"
"time"

Expand Down Expand Up @@ -147,9 +148,6 @@ type Engine struct {

lock sync.RWMutex // protects the fields immediately below

// ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger

// peerLedger saves which peers are waiting for a Cid
peerLedger *peerLedger

Expand Down Expand Up @@ -187,6 +185,8 @@ type Engine struct {

bstoreWorkerCount int
maxOutstandingBytesPerPeer int

maxQueuedWantlistEntriesPerPeer uint
}

// TaskInfo represents the details of a request from a peer.
Expand Down Expand Up @@ -270,6 +270,15 @@ func WithMaxOutstandingBytesPerPeer(count int) Option {
}
}

// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
// If a peer send us more than this we will truncate newest entries.
// It defaults to DefaultMaxQueuedWantlistEntiresPerPeer.
func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option {
return func(e *Engine) {
e.maxQueuedWantlistEntriesPerPeer = count
}
}

func WithSetSendDontHave(send bool) Option {
return func(e *Engine) {
e.sendDontHaves = send
Expand Down Expand Up @@ -330,7 +339,6 @@ func newEngine(
opts ...Option,
) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: NewDefaultScoreLedger(),
bstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount,
maxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer,
Expand All @@ -348,6 +356,7 @@ func newEngine(
targetMessageSize: defaultTargetMessageSize,
tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()),
tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()),
maxQueuedWantlistEntriesPerPeer: defaults.MaxQueuedWantlistEntiresPerPeer,
}

for _, opt := range opts {
Expand Down Expand Up @@ -450,13 +459,10 @@ func (e *Engine) onPeerRemoved(p peer.ID) {

// WantlistForPeer returns the list of keys that the given peer has asked for
func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
partner := e.findOrCreate(p)

partner.lk.Lock()
entries := partner.wantList.Entries()
partner.lk.Unlock()
e.lock.RLock()
defer e.lock.RUnlock()

return entries
return e.peerLedger.WantlistForPeer(p)
}

// LedgerForPeer returns aggregated data communication with a given peer.
Expand Down Expand Up @@ -605,12 +611,7 @@ func (e *Engine) Peers() []peer.ID {
e.lock.RLock()
defer e.lock.RUnlock()

response := make([]peer.ID, 0, len(e.ledgerMap))

for _, ledger := range e.ledgerMap {
response = append(response, ledger.Partner)
}
return response
return e.peerLedger.CollectPeerIDs()
}

// MessageReceived is called when a message is received from a remote peer.
Expand Down Expand Up @@ -659,33 +660,34 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
}

e.lock.Lock()
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Cid)
}
for _, entry := range cancels {
e.peerLedger.CancelWant(p, entry.Cid)
}
e.lock.Unlock()

// Get the ledger for the peer
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()

// If the peer sent a full wantlist, replace the ledger's wantlist
if m.Full() {
l.wantList = wl.New()
e.peerLedger.ClearPeerWantlist(p)
}

var activeEntries []peertask.Task
s := uint(e.peerLedger.WantlistSizeForPeer(p))
if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer {
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe)
// truncate wantlist to avoid overflow
available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0)
if o != 0 {
available = 0
}
wants = wants[:available]
}

// Remove cancelled blocks from the queue
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Entry)
}
for _, entry := range cancels {
log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
if l.CancelWant(entry.Cid) {
if e.peerLedger.CancelWant(p, entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
}
}
e.lock.Unlock()

var activeEntries []peertask.Task

// Cancel a block operation
sendDontHave := func(entry bsmsg.Entry) {
Expand Down Expand Up @@ -724,9 +726,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
c := entry.Cid
blockSize, found := blockSizes[entry.Cid]

// Add each want-have / want-block to the ledger
l.Wants(c, entry.Priority, entry.WantType)

// If the block was not found
if !found {
log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)
Expand Down Expand Up @@ -763,7 +762,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap

// Push entries onto the request queue
if len(activeEntries) > 0 {
e.peerRequestQueue.PushTasks(p, activeEntries...)
e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...)
e.updateMetrics()
}
}
Expand Down Expand Up @@ -809,14 +808,10 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) {
return
}

l := e.findOrCreate(from)

// Record how many bytes were received in the ledger
l.lk.Lock()
defer l.lk.Unlock()
for _, blk := range blks {
log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData()))
e.scoreLedger.AddToReceivedBytes(from, len(blk.RawData()))
}
}

Expand All @@ -835,34 +830,14 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {

// Check each peer to see if it wants one of the blocks we received
var work bool
missingWants := make(map[peer.ID][]cid.Cid)
for _, b := range blks {
k := b.Cid()

e.lock.RLock()
peers := e.peerLedger.Peers(k)
e.lock.RUnlock()

for _, p := range peers {
e.lock.RLock()
ledger, ok := e.ledgerMap[p]
e.lock.RUnlock()

if !ok {
// This can happen if the peer has disconnected while we're processing this list.
log.Debugw("failed to find peer in ledger", "peer", p)
missingWants[p] = append(missingWants[p], k)
continue
}
ledger.lk.RLock()
entry, ok := ledger.WantListContains(k)
ledger.lk.RUnlock()
if !ok {
// This can happen if the peer has canceled their want while we're processing this message.
log.Debugw("wantlist index doesn't match peer's wantlist", "peer", p)
missingWants[p] = append(missingWants[p], k)
continue
}
for _, entry := range peers {
work = true

blockSize := blockSizes[k]
Expand All @@ -873,8 +848,8 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
entrySize = bsmsg.BlockPresenceSize(k)
}

e.peerRequestQueue.PushTasks(p, peertask.Task{
Topic: entry.Cid,
e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, entry.Peer, peertask.Task{
Topic: k,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
Expand All @@ -888,30 +863,6 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
}
}

// If we found missing wants (e.g., because the peer disconnected, we have some races here)
// remove them from the list. Unfortunately, we still have to re-check because the user
// could have re-connected in the meantime.
if len(missingWants) > 0 {
e.lock.Lock()
for p, wl := range missingWants {
if ledger, ok := e.ledgerMap[p]; ok {
ledger.lk.RLock()
for _, k := range wl {
if _, has := ledger.WantListContains(k); has {
continue
}
e.peerLedger.CancelWant(p, k)
}
ledger.lk.RUnlock()
} else {
for _, k := range wl {
e.peerLedger.CancelWant(p, k)
}
}
}
e.lock.Unlock()
}

if work {
e.signalNewWork()
}
Expand All @@ -926,21 +877,20 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
// MessageSent is called when a message has successfully been sent out, to record
// changes.
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
e.lock.Lock()
defer e.lock.Unlock()

// Remove sent blocks from the want list for the peer
for _, block := range m.Blocks() {
e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData()))
l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
e.scoreLedger.AddToSentBytes(p, len(block.RawData()))
e.peerLedger.CancelWantWithType(p, block.Cid(), pb.Message_Wantlist_Block)
}

// Remove sent block presences from the want list for the peer
for _, bp := range m.BlockPresences() {
// Don't record sent data. We reserve that for data blocks.
if bp.Type == pb.Message_Have {
l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
e.peerLedger.CancelWantWithType(p, bp.Cid, pb.Message_Wantlist_Have)
}
}
}
Expand All @@ -951,31 +901,17 @@ func (e *Engine) PeerConnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()

_, ok := e.ledgerMap[p]
if !ok {
e.ledgerMap[p] = newLedger(p)
}

e.scoreLedger.PeerConnected(p)
}

// PeerDisconnected is called when a peer disconnects.
func (e *Engine) PeerDisconnected(p peer.ID) {
e.peerRequestQueue.Clear(p)

e.lock.Lock()
defer e.lock.Unlock()

ledger, ok := e.ledgerMap[p]
if ok {
ledger.lk.RLock()
entries := ledger.Entries()
ledger.lk.RUnlock()

for _, entry := range entries {
e.peerLedger.CancelWant(p, entry.Cid)
}
}
delete(e.ledgerMap, p)

e.peerLedger.PeerDisconnected(p)
e.scoreLedger.PeerDisconnected(p)
}

Expand All @@ -994,29 +930,6 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
return e.LedgerForPeer(p).Recv
}

// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.ID) *ledger {
// Take a read lock (as it's less expensive) to check if we have a ledger
// for the peer
e.lock.RLock()
l, ok := e.ledgerMap[p]
e.lock.RUnlock()
if ok {
return l
}

// There's no ledger, so take a write lock, then check again and create the
// ledger if necessary
e.lock.Lock()
defer e.lock.Unlock()
l, ok = e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
}
return l
}

func (e *Engine) signalNewWork() {
// Signal task generation to restart (if stopped!)
select {
Expand Down
Loading

0 comments on commit 9cb5cb5

Please sign in to comment.