From 02975bde9dc2d975c9ffb74604dfe9df6f0e6aea Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 29 Nov 2016 15:22:05 -0800 Subject: [PATCH] bitswap: add a deadline to sendmsg calls License: MIT Signed-off-by: Jeromy --- exchange/bitswap/network/interface.go | 2 +- exchange/bitswap/network/ipfs_impl.go | 24 ++++++++++++++++++++---- exchange/bitswap/testnet/virtual.go | 4 ++-- exchange/bitswap/wantmanager.go | 2 +- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 21b4d9ead2b..dfc1b3f029b 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -38,7 +38,7 @@ type BitSwapNetwork interface { } type MessageSender interface { - SendMsg(bsmsg.BitSwapMessage) error + SendMsg(context.Context, bsmsg.BitSwapMessage) error Close() error } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 3d992769be5..c854f853ee1 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" @@ -20,6 +21,8 @@ import ( var log = logging.Logger("bitswap_network") +var sendMessageTimeout = time.Minute * 10 + // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { bitswapNetwork := impl{ @@ -53,11 +56,20 @@ func (s *streamMessageSender) Close() error { return s.s.Close() } -func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error { - return msgToStream(s.s, msg) +func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + return msgToStream(ctx, s.s, msg) } -func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error { +func msgToStream(ctx context.Context, s inet.Stream, msg bsmsg.BitSwapMessage) error { + deadline := time.Now().Add(sendMessageTimeout) + if dl, ok := ctx.Deadline(); ok { + deadline = dl + } + + if err := s.SetWriteDeadline(deadline); err != nil { + log.Warningf("error setting deadline: %s", err) + } + switch s.Protocol() { case ProtocolBitswap: if err := msg.ToNetV1(s); err != nil { @@ -72,6 +84,10 @@ func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error { default: return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol()) } + + if err := s.SetWriteDeadline(time.Time{}); err != nil { + log.Warningf("error resetting deadline: %s", err) + } return nil } @@ -107,7 +123,7 @@ func (bsnet *impl) SendMessage( } defer s.Close() - return msgToStream(s, outgoing) + return msgToStream(ctx, s, outgoing) } func (bsnet *impl) SetDelegate(r Receiver) { diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index ab3535c1f48..4d8769e5bfd 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -119,8 +119,8 @@ type messagePasser struct { ctx context.Context } -func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error { - return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m) +func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error { + return mp.net.SendMessage(ctx, mp.local, mp.target, m) } func (mp *messagePasser) Close() error { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 388db20b57a..f5869d82e9f 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -196,7 +196,7 @@ func (mq *msgQueue) doWork(ctx context.Context) { // send wantlist updates for { // try to send this message until we fail. - err := mq.sender.SendMsg(wlm) + err := mq.sender.SendMsg(ctx, wlm) if err == nil { return }