Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

More specific wantlists #74

Merged
merged 2 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil
})

peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
return bsmq.New(p, network)
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx)
Expand Down
21 changes: 13 additions & 8 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
defer partner.activelk.Unlock()

var priority int
newEntries := make([]*wantlist.Entry, 0, len(entries))
newEntries := make([]*peerRequestTaskEntry, 0, len(entries))
for _, entry := range entries {
if partner.activeBlocks.Has(entry.Cid) {
continue
Expand All @@ -75,7 +75,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
if entry.Priority > priority {
priority = entry.Priority
}
newEntries = append(newEntries, entry)
newEntries = append(newEntries, &peerRequestTaskEntry{entry, false})
}

if len(newEntries) == 0 {
Expand All @@ -86,7 +86,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
Entries: newEntries,
Target: to,
created: time.Now(),
Done: func(e []*wantlist.Entry) {
Done: func(e []*peerRequestTaskEntry) {
tl.lock.Lock()
for _, entry := range e {
partner.TaskDone(entry.Cid)
Expand Down Expand Up @@ -117,10 +117,10 @@ func (tl *prq) Pop() *peerRequestTask {
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)

newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
newEntries := make([]*peerRequestTaskEntry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
if entry.Trash {
if entry.trash {
continue
}
partner.requests--
Expand Down Expand Up @@ -150,7 +150,7 @@ func (tl *prq) Remove(k cid.Cid, p peer.ID) {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
entry.Trash = true
entry.trash = true
break
}
}
Expand Down Expand Up @@ -197,13 +197,18 @@ func (tl *prq) thawRound() {
}
}

type peerRequestTaskEntry struct {
*wantlist.Entry
// trash in a book-keeping field
trash bool
}
type peerRequestTask struct {
Entries []*wantlist.Entry
Entries []*peerRequestTaskEntry
Priority int
Target peer.ID

// A callback to signal that this task has been completed
Done func([]*wantlist.Entry)
Done func([]*peerRequestTaskEntry)

// created marks the time that the task was added to the queue
created time.Time
Expand Down
181 changes: 99 additions & 82 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package messagequeue

import (
"context"
"sync"
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -23,86 +22,99 @@ type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

type request interface {
handle(mq *MessageQueue)
}

// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
p peer.ID

outlk sync.Mutex
out bsmsg.BitSwapMessage
ctx context.Context
p peer.ID
network MessageNetwork
wl *wantlist.ThreadSafe

sender bsnet.MessageSender
newRequests chan request
outgoingMessages chan bsmsg.BitSwapMessage
done chan struct{}

// do not touch out of run loop
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
sender bsnet.MessageSender
}

type messageRequest struct {
entries []*bsmsg.Entry
ses uint64
}

work chan struct{}
done chan struct{}
type wantlistRequest struct {
wl *wantlist.SessionTrackedWantlist
}

// New creats a new MessageQueue.
func New(p peer.ID, network MessageNetwork) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
newRequests: make(chan request, 16),
outgoingMessages: make(chan bsmsg.BitSwapMessage),
done: make(chan struct{}),
}
}

// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
return
}
select {
case mq.work <- struct{}{}:
default:
case mq.newRequests <- &messageRequest{entries, ses}:
case <-mq.ctx.Done():
}
}

// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
if len(initialEntries) > 0 {
if mq.out == nil {
mq.out = bsmsg.New(false)
}
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
wl := wantlist.NewSessionTrackedWantlist()
initialWants.CopyWants(wl)

for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
mq.out.AddEntry(e.Cid, e.Priority)
}

select {
case mq.work <- struct{}{}:
default:
}
select {
case mq.newRequests <- &wantlistRequest{wl}:
case <-mq.ctx.Done():
}
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context) {
go mq.runQueue(ctx)
func (mq *MessageQueue) Startup() {
go mq.runQueue()
go mq.sendMessages()
}

// Shutdown stops the processing of messages for a message queue.
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}

func (mq *MessageQueue) runQueue(ctx context.Context) {
func (mq *MessageQueue) runQueue() {
outgoingMessages := func() chan bsmsg.BitSwapMessage {
if mq.nextMessage == nil {
return nil
}
return mq.outgoingMessages
}

for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case newRequest := <-mq.newRequests:
newRequest.handle(mq)
case outgoingMessages() <- mq.nextMessage:
mq.nextMessage = nil
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
case <-mq.ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
Expand All @@ -111,72 +123,86 @@ func (mq *MessageQueue) runQueue(ctx context.Context) {
}
}

func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool {
var work bool
mq.outlk.Lock()
defer mq.outlk.Unlock()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
func (mr *messageRequest) handle(mq *MessageQueue) {
mq.addEntries(mr.entries, mr.ses)
}

func (wr *wantlistRequest) handle(mq *MessageQueue) {
initialWants := wr.wl
initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) {
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
mq.nextMessage.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}

return work
}

func (mq *MessageQueue) doWork(ctx context.Context) {

wlm := mq.extractOutgoingMessage()
if wlm == nil || wlm.Empty() {
return
func (mq *MessageQueue) sendMessages() {
for {
select {
case nextMessage := <-mq.outgoingMessages:
mq.sendMessage(nextMessage)
case <-mq.done:
return
case <-mq.ctx.Done():
return
}
}
}

func (mq *MessageQueue) sendMessage(message bsmsg.BitSwapMessage) {

// NB: only open a stream if we actually have data to send
err := mq.initializeSender(ctx)
err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

// send wantlist updates
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(ctx, wlm) {
if mq.attemptSendAndRecovery(message) {
return
}
}
}

func (mq *MessageQueue) initializeSender(ctx context.Context) error {
func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(ctx, mq.network, mq.p)
nsender, err := openSender(mq.ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}

func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(ctx, wlm)
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
return true
}
Expand All @@ -188,14 +214,14 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
select {
case <-mq.done:
return true
case <-ctx.Done():
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.initializeSender(ctx)
err = mq.initializeSender()
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
Expand All @@ -215,15 +241,6 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
return false
}

func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
mq.out = nil
mq.outlk.Unlock()
return wlm
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
Expand Down
Loading