Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor(bitswap): add comments and extract testutils.go
Browse files Browse the repository at this point in the history
Add comments to all exported functions, extract the utils for creating instances in testnet.go,
moves integration tests to bitswap_test

BREAKING CHANGE: removed one constant -- rebroadcastDelay -- which I believe was unused
  • Loading branch information
hannahhoward committed May 10, 2019
1 parent 61f1223 commit d2705cd
Show file tree
Hide file tree
Showing 21 changed files with 233 additions and 107 deletions.
38 changes: 20 additions & 18 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package bitswap
package bitswap_test

import (
"context"
Expand All @@ -10,19 +10,21 @@ import (
"time"

"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid)
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)

type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block)
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)

type runStats struct {
Dups uint64
Expand Down Expand Up @@ -146,7 +148,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, d
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()

bg := blocksutil.NewBlockGenerator()
Expand All @@ -160,7 +162,7 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
start := time.Now()
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()

instances := sg.Instances(numnodes)
Expand All @@ -169,7 +171,7 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
runDistribution(b, instances, blocks, df, ff, start)
}

func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {

numnodes := len(instances)

Expand All @@ -189,7 +191,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block,
b.Fatal(err)
}

nst := fetcher.Exchange.network.Stats()
nst := fetcher.Adapter.Stats()
stats := runStats{
Time: time.Now().Sub(start),
MsgRecd: nst.MessagesRecvd,
Expand All @@ -204,7 +206,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block,
}
}

func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
b.Fatal(err)
Expand All @@ -214,7 +216,7 @@ func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap1 only works with 2 provs")
}
Expand All @@ -231,7 +233,7 @@ func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second. it also gives every third block to both peers
func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap2 only works with 2 provs")
}
Expand All @@ -252,7 +254,7 @@ func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
}
}

func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap3 only works with 2 provs")
}
Expand All @@ -277,13 +279,13 @@ func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk)
}
}

func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*bssession.Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
Expand All @@ -295,7 +297,7 @@ func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}

// fetch data in batches, 10 at a time
func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for i := 0; i < len(ks); i += 10 {
out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
Expand All @@ -308,7 +310,7 @@ func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}

// fetch each block at the same time concurrently
func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())

var wg sync.WaitGroup
Expand All @@ -325,7 +327,7 @@ func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
wg.Wait()
}

func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
out, err := ses.GetBlocks(context.Background(), ks)
if err != nil {
Expand All @@ -336,7 +338,7 @@ func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
if err != nil {
Expand Down
41 changes: 30 additions & 11 deletions bitswap.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// package bitswap implements the IPFS exchange interface with the BitSwap
// Package bitswap implements the IPFS exchange interface with the BitSwap
// bilateral exchange protocol.
package bitswap

Expand All @@ -24,7 +24,6 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
metrics "github.com/ipfs/go-metrics-interface"
Expand All @@ -43,8 +42,14 @@ const (
)

var (
// ProvideEnabled is a variable that tells Bitswap whether or not
// to handle providing blocks (see experimental provider system)
ProvideEnabled = true

// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 6
Expand All @@ -53,12 +58,9 @@ var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

var rebroadcastDelay = delay.Fixed(time.Minute)

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore) exchange.Interface {

Expand Down Expand Up @@ -121,7 +123,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
network.SetDelegate(bs)

// Start up bitswaps async worker routines
bs.startWorkers(px, ctx)
bs.startWorkers(ctx, px)

// bind the context and process.
// do it over here to avoid closing before all setup is done.
Expand Down Expand Up @@ -190,6 +192,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, er
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
}

// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
var out []cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
Expand All @@ -198,6 +202,8 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
return out
}

// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
return bs.engine.LedgerForPeer(p)
}
Expand Down Expand Up @@ -258,6 +264,8 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
return nil
}

// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
bs.counterLk.Lock()
bs.counters.messagesRecvd++
Expand Down Expand Up @@ -300,8 +308,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg.Wait()
}

var ErrAlreadyHaveBlock = errors.New("already have block")

func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
blkLen := len(b.RawData())
has, err := bs.blockstore.Has(b.Cid())
Expand All @@ -327,28 +333,34 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
}
}

// Connected/Disconnected warns bitswap about peer connections.
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections.
// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
func (bs *Bitswap) ReceiveError(err error) {
log.Infof("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}

// Close is called to shutdown Bitswap
func (bs *Bitswap) Close() error {
return bs.process.Close()
}

// GetWantlist returns the current local wantlist.
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
Expand All @@ -358,10 +370,17 @@ func (bs *Bitswap) GetWantlist() []cid.Cid {
return out
}

// IsOnline is needed to match go-ipfs-exchange-interface
func (bs *Bitswap) IsOnline() bool {
return true
}

// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
}
Loading

0 comments on commit d2705cd

Please sign in to comment.