Skip to content

Commit

Permalink
buffer writes when sending protobufs
Browse files Browse the repository at this point in the history
The protobuf writer performs multiple small writes when writing a
message. We need to buffer these, otherwise we'll send out a packet for
each Write call.
  • Loading branch information
marten-seemann committed Aug 5, 2018
1 parent 15054c0 commit 0864ae2
Showing 1 changed file with 37 additions and 4 deletions.
41 changes: 37 additions & 4 deletions dht_net.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dht

import (
"bufio"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -157,10 +158,35 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
return ms, nil
}

type bufferedWriteCloser interface {
ggio.WriteCloser
Flush() error
}

// The Protobuf writer performs multiple small writes when writing a message.
// We need to buffer those writes, to make sure that we're not sending a new
// packet for every single write.
type bufferedDelimitedWriter struct {
*bufio.Writer
ggio.WriteCloser
}

func newBufferedDelimitedWriter(str inet.Stream) bufferedWriteCloser {
w := bufio.NewWriterSize(str, 1000)
return &bufferedDelimitedWriter{
Writer: w,
WriteCloser: ggio.NewDelimitedWriter(w),
}
}

func (w *bufferedDelimitedWriter) Flush() error {
return w.Writer.Flush()
}

type messageSender struct {
s inet.Stream
r ggio.ReadCloser
w ggio.WriteCloser
w bufferedWriteCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT
Expand Down Expand Up @@ -204,7 +230,7 @@ func (ms *messageSender) prep() error {
}

ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
ms.w = ggio.NewDelimitedWriter(nstr)
ms.w = newBufferedDelimitedWriter(nstr)
ms.s = nstr

return nil
Expand All @@ -224,7 +250,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
return err
}

if err := ms.w.WriteMsg(pmes); err != nil {
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil

Expand Down Expand Up @@ -260,7 +286,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
return nil, err
}

if err := ms.w.WriteMsg(pmes); err != nil {
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil

Expand Down Expand Up @@ -302,6 +328,13 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
}
}

func (ms *messageSender) writeMsg(pmes *pb.Message) error {
if err := ms.w.WriteMsg(pmes); err != nil {
return err
}
return ms.w.Flush()
}

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r ggio.ReadCloser) {
Expand Down

0 comments on commit 0864ae2

Please sign in to comment.