From 6d6ed0785029527801b651eaeefd6bce48c5ca15 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 5 Aug 2018 12:56:41 +0700 Subject: [PATCH 1/2] buffer writes when sending protobufs The protobuf writer performs multiple small writes when writing a message. We need to buffer these, otherwise we'll send out a packet for each Write call. --- dht_net.go | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/dht_net.go b/dht_net.go index e4dcf004d..a5e544a4e 100644 --- a/dht_net.go +++ b/dht_net.go @@ -1,6 +1,7 @@ package dht import ( + "bufio" "context" "fmt" "io" @@ -157,10 +158,35 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { return ms, nil } +type bufferedWriteCloser interface { + ggio.WriteCloser + Flush() error +} + +// The Protobuf writer performs multiple small writes when writing a message. +// We need to buffer those writes, to make sure that we're not sending a new +// packet for every single write. +type bufferedDelimitedWriter struct { + *bufio.Writer + ggio.WriteCloser +} + +func newBufferedDelimitedWriter(str inet.Stream) bufferedWriteCloser { + w := bufio.NewWriter(str) + return &bufferedDelimitedWriter{ + Writer: w, + WriteCloser: ggio.NewDelimitedWriter(w), + } +} + +func (w *bufferedDelimitedWriter) Flush() error { + return w.Writer.Flush() +} + type messageSender struct { s inet.Stream r ggio.ReadCloser - w ggio.WriteCloser + w bufferedWriteCloser lk sync.Mutex p peer.ID dht *IpfsDHT @@ -204,7 +230,7 @@ func (ms *messageSender) prep() error { } ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) - ms.w = ggio.NewDelimitedWriter(nstr) + ms.w = newBufferedDelimitedWriter(nstr) ms.s = nstr return nil @@ -224,7 +250,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro return err } - if err := ms.w.WriteMsg(pmes); err != nil { + if err := ms.writeMsg(pmes); err != nil { ms.s.Reset() ms.s = nil @@ -260,7 +286,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb return nil, err } - if err := ms.w.WriteMsg(pmes); err != nil { + if err := ms.writeMsg(pmes); err != nil { ms.s.Reset() ms.s = nil @@ -302,6 +328,13 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } +func (ms *messageSender) writeMsg(pmes *pb.Message) error { + if err := ms.w.WriteMsg(pmes); err != nil { + return err + } + return ms.w.Flush() +} + func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { errc := make(chan error, 1) go func(r ggio.ReadCloser) { From da02d4a7d5503bea7d5755729d6d42ae21a4889c Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 5 Aug 2018 16:43:15 +0700 Subject: [PATCH 2/2] also use the buffered writer for handling new messages --- dht_net.go | 58 +++++++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/dht_net.go b/dht_net.go index a5e544a4e..03a7cfc43 100644 --- a/dht_net.go +++ b/dht_net.go @@ -18,6 +18,31 @@ import ( var dhtReadMessageTimeout = time.Minute var ErrReadTimeout = fmt.Errorf("timed out reading response") +type bufferedWriteCloser interface { + ggio.WriteCloser + Flush() error +} + +// The Protobuf writer performs multiple small writes when writing a message. +// We need to buffer those writes, to make sure that we're not sending a new +// packet for every single write. +type bufferedDelimitedWriter struct { + *bufio.Writer + ggio.WriteCloser +} + +func newBufferedDelimitedWriter(str io.Writer) bufferedWriteCloser { + w := bufio.NewWriter(str) + return &bufferedDelimitedWriter{ + Writer: w, + WriteCloser: ggio.NewDelimitedWriter(w), + } +} + +func (w *bufferedDelimitedWriter) Flush() error { + return w.Writer.Flush() +} + // handleNewStream implements the inet.StreamHandler func (dht *IpfsDHT) handleNewStream(s inet.Stream) { go dht.handleNewMessage(s) @@ -28,7 +53,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) - w := ggio.NewDelimitedWriter(cw) + w := newBufferedDelimitedWriter(cw) mPeer := s.Conn().RemotePeer() for { @@ -71,7 +96,11 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { } // send out response msg - if err := w.WriteMsg(rpmes); err != nil { + err = w.WriteMsg(rpmes) + if err == nil { + err = w.Flush() + } + if err != nil { s.Reset() log.Debugf("send response error: %s", err) return @@ -158,31 +187,6 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { return ms, nil } -type bufferedWriteCloser interface { - ggio.WriteCloser - Flush() error -} - -// The Protobuf writer performs multiple small writes when writing a message. -// We need to buffer those writes, to make sure that we're not sending a new -// packet for every single write. -type bufferedDelimitedWriter struct { - *bufio.Writer - ggio.WriteCloser -} - -func newBufferedDelimitedWriter(str inet.Stream) bufferedWriteCloser { - w := bufio.NewWriter(str) - return &bufferedDelimitedWriter{ - Writer: w, - WriteCloser: ggio.NewDelimitedWriter(w), - } -} - -func (w *bufferedDelimitedWriter) Flush() error { - return w.Writer.Flush() -} - type messageSender struct { s inet.Stream r ggio.ReadCloser