Skip to content

Commit

Permalink
write blocks retrieved from the exchange to the blockstore
Browse files Browse the repository at this point in the history
This follows the change in bitswap where that responsibility was removed.


This commit was moved from ipfs/go-blockservice@4a60416
  • Loading branch information
MichaelMure committed Jul 27, 2022
1 parent 2ad1dc9 commit 2fac27e
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 18 deletions.
60 changes: 47 additions & 13 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"go.opentelemetry.io/otel/trace"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice/internal"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-verifcid"

"github.com/ipfs/go-blockservice/internal"
)

var logger = logging.Logger("blockservice")
Expand Down Expand Up @@ -84,7 +85,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// NewWriteThrough creates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
Expand Down Expand Up @@ -131,7 +132,6 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "blockService.AddBlock")
defer span.End()
Expand All @@ -155,8 +155,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.HasBlock(ctx, o); err != nil {
logger.Errorf("HasBlock: %s", err.Error())
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}

Expand Down Expand Up @@ -200,11 +200,9 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
}

if s.exchange != nil {
for _, o := range toput {
logger.Debugf("BlockService.BlockAdded %s", o.Cid())
if err := s.exchange.HasBlock(ctx, o); err != nil {
logger.Errorf("HasBlock: %s", err.Error())
}
logger.Debugf("BlockService.BlockAdded %d blocks", len(toput))
if err := s.exchange.NotifyNewBlocks(ctx, toput...); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
return nil
Expand Down Expand Up @@ -249,6 +247,11 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun
if err != nil {
return nil, err
}
// also write in the blockstore for caching
err = bs.Put(ctx, blk)
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand Down Expand Up @@ -325,12 +328,43 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}

for b := range rblocks {
// batch available blocks together
batch := make([]blocks.Block, 0, 8)
batch = append(batch, b)
logger.Debugf("BlockService.BlockFetched %s", b.Cid())
select {
case out <- b:
case <-ctx.Done():

batchLoop:
for {
select {
case moreBlock, ok := <-rblocks:
if !ok {
// rblock has been closed, we set it to nil to avoid pulling zero values
rblocks = nil
} else {
logger.Debugf("BlockService.BlockFetched %s", moreBlock.Cid())
batch = append(batch, moreBlock)
}
case <-ctx.Done():
return
default:
break batchLoop
}
}

// also write in the blockstore for caching
err = bs.PutMany(ctx, batch)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

for _, b = range batch {
select {
case out <- b:
case <-ctx.Done():
return
}
}
}
}()
return out
Expand Down
71 changes: 66 additions & 5 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand All @@ -19,8 +20,8 @@ func TestWriteThroughWorks(t *testing.T) {
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(bstore2)
exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(exchbstore)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

Expand All @@ -44,6 +45,57 @@ func TestWriteThroughWorks(t *testing.T) {
}
}

func TestExchangeWrite(t *testing.T) {
bstore := &PutCountingBlockstore{
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(exchbstore)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

// GetBlock
block := bgen.Next()
err := exchbstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}
got, err := bserv.GetBlock(context.Background(), block.Cid())
if err != nil {
t.Fatal(err)
}
if got.Cid() != block.Cid() {
t.Fatalf("GetBlock returned unexpected block")
}
if bstore.PutCounter != 1 {
t.Fatalf("expected one Put call, have: %d", bstore.PutCounter)
}

// GetBlocks
b1 := bgen.Next()
err = exchbstore.Put(context.Background(), b1)
if err != nil {
t.Fatal(err)
}
b2 := bgen.Next()
err = exchbstore.Put(context.Background(), b2)
if err != nil {
t.Fatal(err)
}
bchan := bserv.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()})
var gotBlocks []blocks.Block
for b := range bchan {
gotBlocks = append(gotBlocks, b)
}
if len(gotBlocks) != 2 {
t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks))
}
if bstore.PutCounter != 3 {
t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter)
}
}

func TestLazySessionInitialization(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -53,8 +105,8 @@ func TestLazySessionInitialization(t *testing.T) {
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
session := offline.Exchange(bstore2)
exchange := offline.Exchange(bstore3)
sessionExch := &fakeSessionExchange{Interface: exchange, session: session}
exch := offline.Exchange(bstore3)
sessionExch := &fakeSessionExchange{Interface: exch, session: session}
bservSessEx := NewWriteThrough(bstore, sessionExch)
bgen := butil.NewBlockGenerator()

Expand All @@ -64,7 +116,11 @@ func TestLazySessionInitialization(t *testing.T) {
t.Fatal(err)
}
block2 := bgen.Next()
err = session.HasBlock(ctx, block2)
err = bstore2.Put(ctx, block2)
if err != nil {
t.Fatal(err)
}
err = session.NotifyNewBlocks(ctx, block2)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -107,6 +163,11 @@ func (bs *PutCountingBlockstore) Put(ctx context.Context, block blocks.Block) er
return bs.Blockstore.Put(ctx, block)
}

func (bs *PutCountingBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
bs.PutCounter += len(blocks)
return bs.Blockstore.PutMany(ctx, blocks)
}

var _ exchange.SessionExchange = (*fakeSessionExchange)(nil)

type fakeSessionExchange struct {
Expand Down

0 comments on commit 2fac27e

Please sign in to comment.