From b649bcbec69e6aa5cd2dfe76e77aaf345edcaec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 13 Mar 2019 21:25:36 +0000 Subject: [PATCH 1/2] defer dialqueue action until initial peers have been added. --- dial_queue.go | 25 ++++++++++++++++++------- dial_queue_test.go | 8 ++++++++ query.go | 5 +++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/dial_queue.go b/dial_queue.go index 9d58a4215..25d741a09 100644 --- a/dial_queue.go +++ b/dial_queue.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync/atomic" "time" peer "github.com/libp2p/go-libp2p-peer" @@ -33,6 +34,7 @@ type dialQueue struct { nWorkers uint out *queue.ChanQueue + started int32 waitingCh chan waitingCh dieCh chan struct{} @@ -90,9 +92,10 @@ type waitingCh struct { ts time.Time } -// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively -// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers -// and dial producers), and takes compensating action by adjusting the worker pool. +// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to +// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both +// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To +// activate the dial queue, call Start(). // // Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established // connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake, @@ -112,7 +115,6 @@ type waitingCh struct { func newDialQueue(params *dqParams) (*dialQueue, error) { dq := &dialQueue{ dqParams: params, - nWorkers: params.config.minParallelism, out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)), growCh: make(chan struct{}, 1), shrinkCh: make(chan struct{}, 1), @@ -120,13 +122,22 @@ func newDialQueue(params *dqParams) (*dialQueue, error) { dieCh: make(chan struct{}, params.config.maxParallelism), } - for i := 0; i < int(params.config.minParallelism); i++ { - go dq.worker() - } go dq.control() return dq, nil } +// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored. +func (dq *dialQueue) Start() { + if !atomic.CompareAndSwapInt32(&dq.started, 0, 1) { + return + } + tgt := int(dq.dqParams.config.minParallelism) + for i := 0; i < tgt; i++ { + go dq.worker() + } + dq.nWorkers = uint(tgt) +} + func (dq *dialQueue) control() { var ( dialled <-chan peer.ID diff --git a/dial_queue_test.go b/dial_queue_test.go index 3b643cfa7..ccc7b730a 100644 --- a/dial_queue_test.go +++ b/dial_queue_test.go @@ -42,6 +42,8 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) { t.Error("unexpected error when constructing the dial queue", err) } + dq.Start() + for i := 0; i < 4; i++ { _ = dq.Consume() time.Sleep(100 * time.Millisecond) @@ -86,6 +88,8 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) { t.Error("unexpected error when constructing the dial queue", err) } + dq.Start() + // acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed // and immediately returnable. for i := 0; i < 3; i++ { @@ -158,6 +162,8 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) { t.Error("unexpected error when constructing the dial queue", err) } + dq.Start() + // keep up to speed with backlog by releasing the dial function every time we acquire a channel. for i := 0; i < 13; i++ { ch := dq.Consume() @@ -210,6 +216,8 @@ func TestDialQueueMutePeriodHonored(t *testing.T) { t.Error("unexpected error when constructing the dial queue", err) } + dq.Start() + // pick up three consumers. for i := 0; i < 3; i++ { _ = dq.Consume() diff --git a/query.go b/query.go index a34b590d1..9570d2ad2 100644 --- a/query.go +++ b/query.go @@ -136,6 +136,11 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes r.addPeerToQuery(p) } + // start the dial queue only after we've added the initial set of peers. + // this is to avoid race conditions that could cause the peersRemaining todoctr + // to be done too early if the initial dial fails before others make it into the queue. + r.peersDialed.Start() + // go do this thing. // do it as a child proc to make sure Run exits // ONLY AFTER spawn workers has exited. From bd60c95d070821284d8297475280f6b9f5bdfa03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 13 Mar 2019 21:39:16 +0000 Subject: [PATCH 2/2] replace atomics with sync.Once. --- dial_queue.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/dial_queue.go b/dial_queue.go index 25d741a09..fdd2b9081 100644 --- a/dial_queue.go +++ b/dial_queue.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "math" - "sync/atomic" + "sync" "time" peer "github.com/libp2p/go-libp2p-peer" @@ -32,9 +32,9 @@ const ( type dialQueue struct { *dqParams - nWorkers uint - out *queue.ChanQueue - started int32 + nWorkers uint + out *queue.ChanQueue + startOnce sync.Once waitingCh chan waitingCh dieCh chan struct{} @@ -128,14 +128,13 @@ func newDialQueue(params *dqParams) (*dialQueue, error) { // Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored. func (dq *dialQueue) Start() { - if !atomic.CompareAndSwapInt32(&dq.started, 0, 1) { - return - } - tgt := int(dq.dqParams.config.minParallelism) - for i := 0; i < tgt; i++ { - go dq.worker() - } - dq.nWorkers = uint(tgt) + dq.startOnce.Do(func() { + tgt := int(dq.dqParams.config.minParallelism) + for i := 0; i < tgt; i++ { + go dq.worker() + } + dq.nWorkers = uint(tgt) + }) } func (dq *dialQueue) control() {