From 0864ae2fdc70b476462a607d1aae62f1f2043c3b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 5 Aug 2018 12:56:41 +0700 Subject: [PATCH] buffer writes when sending protobufs The protobuf writer performs multiple small writes when writing a message. We need to buffer these, otherwise we'll send out a packet for each Write call. --- dht_net.go | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/dht_net.go b/dht_net.go index e4dcf004d..bb7eb6540 100644 --- a/dht_net.go +++ b/dht_net.go @@ -1,6 +1,7 @@ package dht import ( + "bufio" "context" "fmt" "io" @@ -157,10 +158,35 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { return ms, nil } +type bufferedWriteCloser interface { + ggio.WriteCloser + Flush() error +} + +// The Protobuf writer performs multiple small writes when writing a message. +// We need to buffer those writes, to make sure that we're not sending a new +// packet for every single write. +type bufferedDelimitedWriter struct { + *bufio.Writer + ggio.WriteCloser +} + +func newBufferedDelimitedWriter(str inet.Stream) bufferedWriteCloser { + w := bufio.NewWriterSize(str, 1000) + return &bufferedDelimitedWriter{ + Writer: w, + WriteCloser: ggio.NewDelimitedWriter(w), + } +} + +func (w *bufferedDelimitedWriter) Flush() error { + return w.Writer.Flush() +} + type messageSender struct { s inet.Stream r ggio.ReadCloser - w ggio.WriteCloser + w bufferedWriteCloser lk sync.Mutex p peer.ID dht *IpfsDHT @@ -204,7 +230,7 @@ func (ms *messageSender) prep() error { } ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) - ms.w = ggio.NewDelimitedWriter(nstr) + ms.w = newBufferedDelimitedWriter(nstr) ms.s = nstr return nil @@ -224,7 +250,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro return err } - if err := ms.w.WriteMsg(pmes); err != nil { + if err := ms.writeMsg(pmes); err != nil { ms.s.Reset() ms.s = nil @@ -260,7 +286,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb return nil, err } - if err := ms.w.WriteMsg(pmes); err != nil { + if err := ms.writeMsg(pmes); err != nil { ms.s.Reset() ms.s = nil @@ -302,6 +328,13 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } +func (ms *messageSender) writeMsg(pmes *pb.Message) error { + if err := ms.w.WriteMsg(pmes); err != nil { + return err + } + return ms.w.Flush() +} + func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { errc := make(chan error, 1) go func(r ggio.ReadCloser) {