Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#369 from ipfs/fix/over-allocated-ctx
Browse files Browse the repository at this point in the history
fix: avoid calling ctx.SetDeadline() every time we send a message

This commit was moved from ipfs/go-bitswap@f4b63ee
  • Loading branch information
Stebalien authored Apr 22, 2020
2 parents 5cfe98e + 573478d commit 5e4dbaf
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,14 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro
return s.stream, nil
}

if err := s.bsnet.ConnectTo(ctx, s.to); err != nil {
tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout)
defer cancel()

if err := s.bsnet.ConnectTo(tctx, s.to); err != nil {
return nil, err
}

stream, err := s.bsnet.newStreamToPeer(ctx, s.to)
stream, err := s.bsnet.newStreamToPeer(tctx, s.to)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,25 +142,20 @@ func (s *streamMessageSender) SupportsHave() bool {

// Send a message to the peer, attempting multiple times
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return s.multiAttempt(ctx, func(fnctx context.Context) error {
return s.send(fnctx, msg)
return s.multiAttempt(ctx, func() error {
return s.send(ctx, msg)
})
}

// Perform a function with multiple attempts, and a timeout
func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.Context) error) error {
func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) error {
// Try to call the function repeatedly
var err error
for i := 0; i < s.opts.MaxRetries; i++ {
deadline := time.Now().Add(s.opts.SendTimeout)
sndctx, cancel := context.WithDeadline(ctx, deadline)

if err = fn(sndctx); err == nil {
cancel()
if err = fn(); err == nil {
// Attempt was successful
return nil
}
cancel()

// Attempt failed

Expand Down Expand Up @@ -196,13 +194,18 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.

// Send a message to the peer
func (s *streamMessageSender) send(ctx context.Context, msg bsmsg.BitSwapMessage) error {
start := time.Now()
stream, err := s.Connect(ctx)
if err != nil {
log.Infof("failed to open stream to %s: %s", s.to, err)
return err
}

if err = s.bsnet.msgToStream(ctx, stream, msg); err != nil {
// The send timeout includes the time required to connect
// (although usually we will already have connected - we only need to
// connect after a failed attempt to send)
timeout := s.opts.SendTimeout - time.Since(start)
if err = s.bsnet.msgToStream(ctx, stream, msg, timeout); err != nil {
log.Infof("failed to send message to %s: %s", s.to, err)
return err
}
Expand Down Expand Up @@ -234,9 +237,9 @@ func (bsnet *impl) SupportsHave(proto protocol.ID) bool {
return true
}

func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
if dl, ok := ctx.Deadline(); ok && dl.Before(deadline) {
deadline = dl
}

Expand Down Expand Up @@ -277,8 +280,8 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag
opts: opts,
}

err := sender.multiAttempt(ctx, func(fnctx context.Context) error {
_, err := sender.Connect(fnctx)
err := sender.multiAttempt(ctx, func() error {
_, err := sender.Connect(ctx)
return err
})

Expand Down Expand Up @@ -313,7 +316,7 @@ func (bsnet *impl) SendMessage(
return err
}

if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
if err = bsnet.msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil {
_ = s.Reset()
return err
}
Expand Down

0 comments on commit 5e4dbaf

Please sign in to comment.