From 152fa80385b083a073f23ecbf79e02da1ffe0cb2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 2 Mar 2022 21:31:44 +0200 Subject: [PATCH] moar timer stopage fixes --- multiplex.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/multiplex.go b/multiplex.go index 6cd9c08..75d10be 100644 --- a/multiplex.go +++ b/multiplex.go @@ -353,6 +353,7 @@ func (mp *Multiplex) handleIncoming() { recvTimeout := time.NewTimer(0) defer recvTimeout.Stop() + recvTimeoutFired := false loop: for { @@ -475,11 +476,9 @@ loop: rd += nextChunk - if !recvTimeout.Stop() { - select { - case <-recvTimeout.C: - default: - } + if !recvTimeout.Stop() && !recvTimeoutFired { + <-recvTimeout.C + recvTimeoutFired = false } recvTimeout.Reset(ReceiveTimeout) @@ -496,6 +495,7 @@ loop: break read case <-recvTimeout.C: + recvTimeoutFired = true mp.putBufferInbound(b) log.Warnf("timed out receiving message into stream queue.") // Do not do this asynchronously. Otherwise, we @@ -604,12 +604,10 @@ func (mp *Multiplex) skipNextMsg(mlen int) error { } func (mp *Multiplex) getBufferInbound(length int) ([]byte, error) { + timerFired := false defer func() { - if !mp.bufInTimer.Stop() { - select { - case <-mp.bufInTimer.C: - default: - } + if !mp.bufInTimer.Stop() && !timerFired { + <-mp.bufInTimer.C } }() mp.bufInTimer.Reset(getInputBufferTimeout) @@ -617,6 +615,7 @@ func (mp *Multiplex) getBufferInbound(length int) ([]byte, error) { select { case mp.bufIn <- struct{}{}: case <-mp.bufInTimer.C: + timerFired = true return nil, errTimeout case <-mp.shutdown: return nil, ErrShutdown