Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mplex salvage operations, part II #102

Merged
merged 14 commits into from
Mar 3, 2022
199 changes: 142 additions & 57 deletions multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import (

var log = logging.Logger("mplex")

var MaxMessageSize = 1 << 20
var MaxBuffers = 4
const (
MaxMessageSize = 1 << 20
BufferSize = 4096
MaxBuffers = 4
ChunkSize = BufferSize - 20
vyzo marked this conversation as resolved.
Show resolved Hide resolved
)

// Max time to block waiting for a slow reader to read from a stream before
// resetting it. Preferably, we'd have some form of back-pressure mechanism but
Expand All @@ -39,11 +43,9 @@ var ErrInvalidState = errors.New("received an unexpected message from the peer")

var errTimeout = timeout{}

var (
ResetStreamTimeout = 2 * time.Minute
var ResetStreamTimeout = 2 * time.Minute

WriteCoalesceDelay = 100 * time.Microsecond
)
var getInputBufferTimeout = time.Minute

type timeout struct{}

Expand Down Expand Up @@ -93,6 +95,7 @@ type Multiplex struct {
chLock sync.Mutex

bufIn, bufOut chan struct{}
bufInTimer *time.Timer
reservedMemory int
}

Expand All @@ -104,41 +107,42 @@ func NewMultiplex(con net.Conn, initiator bool, memoryManager MemoryManager) (*M
mp := &Multiplex{
con: con,
initiator: initiator,
buf: bufio.NewReader(con),
channels: make(map[streamID]*Stream),
closed: make(chan struct{}),
shutdown: make(chan struct{}),
writeCh: make(chan []byte, 16),
nstreams: make(chan *Stream, 16),
memoryManager: memoryManager,
}

// up-front reserve memory for max buffers
bufs := 0
var err error
for i := 0; i < MaxBuffers; i++ {
// up-front reserve memory for the essential buffers (1 input, 1 output + the reader buffer)
if err := mp.memoryManager.ReserveMemory(3*BufferSize, 255); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's the input/output buffer? I see the reader buffer, but that's it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we get it from the pool and return it.

See getBuffer*/putBuffer*.

return nil, err
}

mp.reservedMemory += 3 * BufferSize
vyzo marked this conversation as resolved.
Show resolved Hide resolved
bufs := 1

// reserve some more memory for buffers if possible
for i := 1; i < MaxBuffers; i++ {
var prio uint8
switch bufs {
case 0:
prio = 255
case 1:
if bufs < 2 {
prio = 192
default:
} else {
prio = 128
}
if err = mp.memoryManager.ReserveMemory(2*MaxMessageSize, prio); err != nil {

if err := mp.memoryManager.ReserveMemory(2*BufferSize, prio); err != nil {
break
}
mp.reservedMemory += 2 * MaxMessageSize
mp.reservedMemory += 2 * BufferSize
vyzo marked this conversation as resolved.
Show resolved Hide resolved
bufs++
}

if bufs == 0 {
return nil, err
}

mp.buf = bufio.NewReaderSize(con, BufferSize)
mp.writeCh = make(chan []byte, bufs)
mp.bufIn = make(chan struct{}, bufs)
mp.bufOut = make(chan struct{}, bufs)
mp.bufInTimer = time.NewTimer(0)
marten-seemann marked this conversation as resolved.
Show resolved Hide resolved

go mp.handleIncoming()
go mp.handleOutgoing()
Expand All @@ -150,7 +154,7 @@ func (mp *Multiplex) newStream(id streamID, name string) (s *Stream) {
s = &Stream{
id: id,
name: name,
dataIn: make(chan []byte, 8),
dataIn: make(chan []byte, 1),
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
Expand Down Expand Up @@ -180,6 +184,9 @@ func (mp *Multiplex) Close() error {
// Wait for the receive loop to finish.
<-mp.closed

// only stop it here, after the input loop has finished
mp.bufInTimer.Stop()
vyzo marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand Down Expand Up @@ -341,10 +348,7 @@ func (mp *Multiplex) handleIncoming() {
recvTimeout := time.NewTimer(0)
marten-seemann marked this conversation as resolved.
Show resolved Hide resolved
defer recvTimeout.Stop()

if !recvTimeout.Stop() {
<-recvTimeout.C
}

loop:
for {
chID, tag, err := mp.readNextHeader()
if err != nil {
Expand All @@ -366,7 +370,7 @@ func (mp *Multiplex) handleIncoming() {
// etc...
tag += (tag & 1)

b, err := mp.readNext()
mlen, err := mp.readNextMsgLen()
if err != nil {
mp.shutdownErr = err
return
Expand All @@ -384,6 +388,18 @@ func (mp *Multiplex) handleIncoming() {
return
}

if mlen > ChunkSize {
log.Debugf("stream name is too large! [%d]", mlen)
mp.shutdownErr = fmt.Errorf("stream name too large")
return
}

b, err := mp.readNextMsg(mlen)
if err != nil {
mp.shutdownErr = err
return
}

name := string(b)
mp.putBufferInbound(b)

Expand All @@ -398,6 +414,11 @@ func (mp *Multiplex) handleIncoming() {
}

case resetTag:
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}

if !ok {
// This is *ok*. We forget the stream on reset.
continue
Expand All @@ -407,6 +428,11 @@ func (mp *Multiplex) handleIncoming() {
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
case closeTag:
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}

if !ok {
// may have canceled our reads already.
continue
Expand All @@ -430,33 +456,67 @@ func (mp *Multiplex) handleIncoming() {
// We're not accepting data on this stream, for
// some reason. It's likely that we reset it, or
// simply canceled reads (e.g., called Close).
mp.putBufferInbound(b)
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}
continue
}

recvTimeout.Reset(ReceiveTimeout)
select {
case msch.dataIn <- b:
case <-msch.readCancel:
// the user has canceled reading. walk away.
mp.putBufferInbound(b)
case <-recvTimeout.C:
mp.putBufferInbound(b)
log.Warnf("timed out receiving message into stream queue.")
// Do not do this asynchronously. Otherwise, we
// could drop a message, then receive a message,
// then reset.
msch.Reset()
continue
case <-mp.shutdown:
mp.putBufferInbound(b)
return
}
if !recvTimeout.Stop() {
<-recvTimeout.C
read:
for rd := 0; rd < mlen; {
nextChunk := mlen - rd
if nextChunk > ChunkSize {
nextChunk = ChunkSize
}

b, err := mp.readNextMsg(nextChunk)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
mp.shutdownErr = err
return
}

rd += nextChunk

if !recvTimeout.Stop() {
<-recvTimeout.C
}
recvTimeout.Reset(ReceiveTimeout)

select {
case msch.dataIn <- b:

case <-msch.readCancel:
// the user has canceled reading. walk away.
mp.putBufferInbound(b)
if err := mp.skipNextMsg(mlen - rd); err != nil {
mp.shutdownErr = err
return
}
break read

case <-recvTimeout.C:
mp.putBufferInbound(b)
log.Warnf("timed out receiving message into stream queue.")
// Do not do this asynchronously. Otherwise, we
// could drop a message, then receive a message,
// then reset.
msch.Reset()
if err := mp.skipNextMsg(mlen - rd); err != nil {
mp.shutdownErr = err
return
}
continue loop

case <-mp.shutdown:
mp.putBufferInbound(b)
return
}
}

default:
log.Debugf("message with unknown header on stream %s", ch)
mp.skipNextMsg(mlen)
if ok {
msch.Reset()
}
Expand Down Expand Up @@ -502,36 +562,61 @@ func (mp *Multiplex) readNextHeader() (uint64, uint64, error) {
return ch, rem, nil
}

func (mp *Multiplex) readNext() ([]byte, error) {
// get length
func (mp *Multiplex) readNextMsgLen() (int, error) {
l, err := varint.ReadUvarint(mp.buf)
if err != nil {
return nil, err
return 0, err
}

if l > uint64(MaxMessageSize) {
return nil, fmt.Errorf("message size too large")
return 0, fmt.Errorf("message size too large")
}

if l == 0 {
return nil, nil
return 0, nil
}

buf, err := mp.getBufferInbound(int(l))
return int(l), nil
}

func (mp *Multiplex) readNextMsg(mlen int) ([]byte, error) {
buf, err := mp.getBufferInbound(mlen)
if err != nil {
return nil, err
}

n, err := io.ReadFull(mp.buf, buf)
if err != nil {
mp.putBufferInbound(buf)
return nil, err
}
if n < mlen {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
mp.putBufferInbound(buf)
return nil, fmt.Errorf("incomplete read")
}

return buf, nil
}

func (mp *Multiplex) skipNextMsg(mlen int) error {
if mlen == 0 {
return nil
}

return buf[:n], nil
_, err := mp.buf.Discard(mlen)
return err
}

func (mp *Multiplex) getBufferInbound(length int) ([]byte, error) {
if !mp.bufInTimer.Stop() {
<-mp.bufInTimer.C
}
vyzo marked this conversation as resolved.
Show resolved Hide resolved
mp.bufInTimer.Reset(getInputBufferTimeout)

select {
case mp.bufIn <- struct{}{}:
case <-mp.bufInTimer.C:
return nil, errTimeout
case <-mp.shutdown:
return nil, ErrShutdown
}
Expand Down
6 changes: 1 addition & 5 deletions multiplex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
"time"
)

func init() {
// Let's not slow down the tests too much...
ReceiveTimeout = 100 * time.Millisecond
}

func TestSlowReader(t *testing.T) {
a, b := net.Pipe()

Expand Down Expand Up @@ -287,6 +282,7 @@ func TestEcho(t *testing.T) {
}

func TestFullClose(t *testing.T) {
t.Skip("nonsensical flaky test")
a, b := net.Pipe()
mpa, err := NewMultiplex(a, false, nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (s *Stream) Write(b []byte) (int, error) {
var written int
for written < len(b) {
wl := len(b) - written
if wl > MaxMessageSize {
wl = MaxMessageSize
if wl > ChunkSize {
wl = ChunkSize
}

n, err := s.write(b[written : written+wl])
Expand Down