From 9f1f433acac0ab3729ed19356e4cfe92a8414ab9 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Sun, 21 Sep 2014 21:39:45 -0700 Subject: [PATCH] test(bitswap) send entire wantlist to peers fix(bitswap) pass go vet fixes #97 https://github.com/jbenet/go-ipfs/issues/97 This commit was moved from ipfs/go-bitswap@f96246e119c1710285112342de6405f0cd331c3d --- bitswap/bitswap.go | 70 +++++++++++++++++++++++++++++++++++------ bitswap/bitswap_test.go | 50 ++++++++++++++++++----------- 2 files changed, 93 insertions(+), 27 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 2dc73ca8e..cf5303297 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -2,6 +2,7 @@ package bitswap import ( "errors" + "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" @@ -28,6 +29,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS strategy: strategy.New(), routing: directory, sender: networkAdapter, + wantlist: WantList{ + data: make(map[u.Key]struct{}), + }, } networkAdapter.SetDelegate(bs) @@ -53,6 +57,39 @@ type bitswap struct { // interact with partners. // TODO(brian): save the strategy's state to the datastore strategy strategy.Strategy + + wantlist WantList +} + +type WantList struct { + lock sync.RWMutex + data map[u.Key]struct{} +} + +func (wl *WantList) Add(k u.Key) { + u.DOut("Adding %v to Wantlist\n", k.Pretty()) + wl.lock.Lock() + defer wl.lock.Unlock() + + wl.data[k] = struct{}{} +} + +func (wl *WantList) Remove(k u.Key) { + u.DOut("Removing %v from Wantlist\n", k.Pretty()) + wl.lock.Lock() + defer wl.lock.Unlock() + + delete(wl.data, k) +} + +func (wl *WantList) Keys() []u.Key { + wl.lock.RLock() + defer wl.lock.RUnlock() + keys := make([]u.Key, 0) + for k, _ := range wl.data { + keys = append(keys, k) + } + return keys } // GetBlock attempts to retrieve a particular block from peers within the @@ -60,9 +97,10 @@ type bitswap struct { // // TODO ensure only one active request per key func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { + u.DOut("Get Block %v\n", k.Pretty()) ctx, cancelFunc := context.WithCancel(parent) - // TODO add to wantlist + bs.wantlist.Add(k) promise := bs.notifications.Subscribe(ctx, k) const maxProviders = 20 @@ -70,6 +108,9 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) go func() { message := bsmsg.New() + for _, wanted := range bs.wantlist.Keys() { + message.AppendWanted(wanted) + } message.AppendWanted(k) for iiiii := range peersToQuery { // u.DOut("bitswap got peersToQuery: %s\n", iiiii) @@ -94,6 +135,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) select { case block := <-promise: cancelFunc() + bs.wantlist.Remove(k) // TODO remove from wantlist return &block, nil case <-parent.Done(): @@ -104,6 +146,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) // HasBlock announces the existance of a block to bitswap, potentially sending // it to peers (Partners) whose WantLists include it. func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { + u.DOut("Has Block %v\n", blk.Key().Pretty()) + bs.wantlist.Remove(blk.Key()) bs.sendToPeersThatWant(ctx, blk) return bs.routing.Provide(ctx, blk.Key()) } @@ -111,6 +155,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( *peer.Peer, bsmsg.BitSwapMessage, error) { + u.DOut("ReceiveMessage from %v\n", p.Key().Pretty()) if p == nil { return nil, nil, errors.New("Received nil Peer") @@ -132,19 +177,21 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs }(block) } + message := bsmsg.New() + for _, wanted := range bs.wantlist.Keys() { + message.AppendWanted(wanted) + } for _, key := range incoming.Wantlist() { if bs.strategy.ShouldSendBlockToPeer(key, p) { - block, errBlockNotFound := bs.blockstore.Get(key) - if errBlockNotFound != nil { - return nil, nil, errBlockNotFound + if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { + continue + } else { + message.AppendBlock(*block) } - message := bsmsg.New() - message.AppendBlock(*block) - defer bs.strategy.MessageSent(p, message) - return p, message, nil } } - return nil, nil, nil + defer bs.strategy.MessageSent(p, message) + return p, message, nil } // send strives to ensure that accounting is always performed when a message is @@ -155,11 +202,16 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag } func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { + u.DOut("Sending %v to peers that want it\n", block.Key().Pretty()) for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { + u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty()) if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { message := bsmsg.New() message.AppendBlock(block) + for _, wanted := range bs.wantlist.Keys() { + message.AppendWanted(wanted) + } go bs.send(ctx, p, message) } } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 60ba7bf0b..6ec45f21c 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -16,6 +16,7 @@ import ( strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" peer "github.com/jbenet/go-ipfs/peer" + util "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -145,7 +146,10 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro wg.Done() } +// TODO simplify this test. get to the _essence_! func TestSendToWantingPeer(t *testing.T) { + util.Debug = true + net := tn.VirtualNetwork() rs := tn.VirtualRoutingServer() sg := NewSessionGenerator(net, rs) @@ -155,48 +159,55 @@ func TestSendToWantingPeer(t *testing.T) { w := sg.Next() o := sg.Next() + t.Logf("Session %v\n", me.peer.Key().Pretty()) + t.Logf("Session %v\n", w.peer.Key().Pretty()) + t.Logf("Session %v\n", o.peer.Key().Pretty()) + alpha := bg.Next() - const timeout = 100 * time.Millisecond - const wait = 100 * time.Millisecond + const timeout = 1 * time.Millisecond // FIXME don't depend on time - t.Log("Peer |w| attempts to get a file |alpha|. NB: alpha not available") + t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer.Key().Pretty(), alpha.Key().Pretty()) ctx, _ := context.WithTimeout(context.Background(), timeout) _, err := w.exchange.Block(ctx, alpha.Key()) if err == nil { - t.Error("Expected alpha to NOT be available") + t.Fatalf("Expected %v to NOT be available", alpha.Key().Pretty()) } - time.Sleep(wait) - t.Log("Peer |w| announces availability of a file |beta|") beta := bg.Next() + t.Logf("Peer %v announes availability of %v\n", w.peer.Key().Pretty(), beta.Key().Pretty()) ctx, _ = context.WithTimeout(context.Background(), timeout) + if err := w.blockstore.Put(beta); err != nil { + t.Fatal(err) + } w.exchange.HasBlock(ctx, beta) - time.Sleep(wait) - t.Log("I request and get |beta| from |w|. In the message, I receive |w|'s wants [alpha]") - t.Log("I don't have alpha, but I keep it on my wantlist.") + t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer.Key().Pretty(), beta.Key().Pretty(), w.peer.Key().Pretty(), alpha.Key().Pretty()) ctx, _ = context.WithTimeout(context.Background(), timeout) - me.exchange.Block(ctx, beta.Key()) - time.Sleep(wait) + if _, err := me.exchange.Block(ctx, beta.Key()); err != nil { + t.Fatal(err) + } - t.Log("Peer |o| announces the availability of |alpha|") + t.Logf("%v announces availability of %v\n", o.peer.Key().Pretty(), alpha.Key().Pretty()) ctx, _ = context.WithTimeout(context.Background(), timeout) + if err := o.blockstore.Put(alpha); err != nil { + t.Fatal(err) + } o.exchange.HasBlock(ctx, alpha) - time.Sleep(wait) - t.Log("I request |alpha| for myself.") + t.Logf("%v requests %v\n", me.peer.Key().Pretty(), alpha.Key().Pretty()) ctx, _ = context.WithTimeout(context.Background(), timeout) - me.exchange.Block(ctx, alpha.Key()) - time.Sleep(wait) + if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil { + t.Fatal(err) + } - t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|") + t.Logf("%v should now have %v\n", w.peer.Key().Pretty(), alpha.Key().Pretty()) block, err := w.blockstore.Get(alpha.Key()) if err != nil { t.Fatal("Should not have received an error") } if block.Key() != alpha.Key() { - t.Error("Expected to receive alpha from me") + t.Fatal("Expected to receive alpha from me") } } @@ -278,6 +289,9 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance { strategy: strategy.New(), routing: htc, sender: adapter, + wantlist: WantList{ + data: make(map[util.Key]struct{}), + }, } adapter.SetDelegate(bs) return instance{