Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: debounce wants manually #255

Merged
merged 2 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module github.com/ipfs/go-bitswap

require (
github.com/bep/debounce v1.2.0
github.com/cskr/pubsub v1.0.2
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo=
github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q=
Expand Down Expand Up @@ -94,8 +92,6 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8=
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU=
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
Expand Down Expand Up @@ -344,8 +340,6 @@ github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8 h1:wrYcW5yxSi3dU07n5jnuS5PrNwyHy0zRHGVoUugWvXg=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.10 h1:lMoNbh2Ssd9PUF74Nz008KGzGPlfeV6wH3rit5IIGCM=
github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ=
Expand Down
64 changes: 50 additions & 14 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

debounce "github.com/bep/debounce"

bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
Expand All @@ -34,6 +32,11 @@ const (
maxPriority = math.MaxInt32
// sendMessageDebounce is the debounce duration when calling sendMessage()
sendMessageDebounce = time.Millisecond
// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
sendMessageCutoff = 256
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 20 * time.Millisecond
)

// MessageNetwork is any network that can connect peers and generate a message
Expand All @@ -54,9 +57,8 @@ type MessageQueue struct {
maxMessageSize int
sendErrorBackoff time.Duration

signalWorkReady func()
outgoingWork chan struct{}
done chan struct{}
outgoingWork chan time.Time
done chan struct{}

// Take lock whenever any of these variables are modified
wllock sync.Mutex
Expand Down Expand Up @@ -165,17 +167,13 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
bcstWants: newRecallWantList(),
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan struct{}, 1),
outgoingWork: make(chan time.Time, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
}

// Apply debounce to the work ready signal (which triggers sending a message)
debounced := debounce.New(sendMessageDebounce)
mq.signalWorkReady = func() { debounced(mq.onWorkReady) }

return mq
}

Expand Down Expand Up @@ -285,11 +283,45 @@ func (mq *MessageQueue) onShutdown() {
func (mq *MessageQueue) runQueue() {
defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
scheduleWork := time.NewTimer(0)
if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
// See: https://golang.org/pkg/time/#Timer.Stop
<-scheduleWork.C
}

var workScheduled time.Time
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case <-mq.outgoingWork:
case when := <-mq.outgoingWork:
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
// We send the time on the channel so we accurately
// track delay.
if workScheduled.IsZero() {
workScheduled = when
} else if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
<-scheduleWork.C
}

// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCutoff ||
time.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
}
case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case <-mq.done:
if mq.sender != nil {
Expand Down Expand Up @@ -335,9 +367,9 @@ func (mq *MessageQueue) transferRebroadcastWants() bool {
return true
}

func (mq *MessageQueue) onWorkReady() {
func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- struct{}{}:
case mq.outgoingWork <- time.Now():
default:
}
}
Expand Down Expand Up @@ -443,10 +475,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
// }

func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}

func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()

return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}

func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
Expand Down