Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup bitswap and handle message send failure slightly better #3408

Merged
merged 1 commit into from
Nov 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
})

bs := &Bitswap{
self: p,
blockstore: bstore,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
Expand Down Expand Up @@ -112,34 +111,36 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,

// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/wont/want/

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah it't just a missing apostrophe :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah
ok.

wm *WantManager

// the ID of the peer to act on behalf of
self peer.ID
// the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager

// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore

// notifications engine for receiving new blocks and routing them to the
// appropriate user requests
notifications notifications.PubSub

// send keys to a worker to find and connect to providers for them
// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest

engine *decision.Engine

process process.Process

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan *cid.Cid

// provideKeys directly feeds provide workers
provideKeys chan *cid.Cid

process process.Process

// Counters for various statistics
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
Expand Down Expand Up @@ -167,13 +168,12 @@ func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, e
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)

// TODO: this request ID should come in from a higher layer so we can track
// across multiple 'GetBlock' invocations
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)

defer func() {
cancelFunc()
}()
defer cancelFunc()

promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
Expand Down
95 changes: 64 additions & 31 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,13 @@ func (mq *msgQueue) runQueue(ctx context.Context) {
}

func (mq *msgQueue) doWork(ctx context.Context) {
// allow ten minutes for connections
// this includes looking them up in the dht
// dialing them, and handshaking
if mq.sender == nil {
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant connect to peer %s: %s", mq.p, err)
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
log.Infof("cant open new stream to peer %s: %s", mq.p, err)
// TODO: cant open stream, what now?
return
}

mq.sender = nsender
}

// grab outgoing message
Expand All @@ -210,14 +195,64 @@ func (mq *msgQueue) doWork(ctx context.Context) {
mq.outlk.Unlock()

// send wantlist updates
err := mq.sender.SendMsg(wlm)
if err != nil {
for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm)
if err == nil {
return
}

log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender = nil
// TODO: what do we do if this fails?
return

select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.openSender(ctx)
if err != nil {
log.Error("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}

mq.sender = nsender
return nil
}

func (pm *WantManager) Connected(p peer.ID) {
Expand Down Expand Up @@ -292,14 +327,13 @@ func (pm *WantManager) Run() {
}

func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.network = wm.network
mq.p = p
mq.refcnt = 1

return mq
return &msgQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
network: wm.network,
p: p,
refcnt: 1,
}
}

func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Expand All @@ -312,8 +346,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
}
}()

// if we have no message held, or the one we are given is full
// overwrite the one we are holding
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}
Expand Down
6 changes: 6 additions & 0 deletions exchange/bitswap/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
for {
select {
case e := <-bs.findKeys:
select { // make sure its not already cancelled
case <-e.Ctx.Done():
continue
default:
}

activeLk.Lock()
if kset.Has(e.Cid) {
activeLk.Unlock()
Expand Down