Skip to content

Commit

Permalink
blockservice should notify the exchange when caching blocks in GetBlo…
Browse files Browse the repository at this point in the history
…ck(s)

This commit was moved from ipfs/go-blockservice@f2a4f4f
  • Loading branch information
MichaelMure committed Jul 27, 2022
1 parent 2fac27e commit 8ee16bb
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 50 deletions.
33 changes: 22 additions & 11 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,19 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.exchange != nil {
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, f) // hash security
}

func (s *blockService) getExchange() exchange.Fetcher {
func (s *blockService) getExchange() exchange.Interface {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) (blocks.Block, error) {
err := verifcid.ValidateCid(c) // hash security
if err != nil {
return nil, err
Expand All @@ -247,11 +247,15 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun
if err != nil {
return nil, err
}
// also write in the blockstore for caching
// also write in the blockstore for caching, inform the exchange that the block is available
err = bs.Put(ctx, blk)
if err != nil {
return nil, err
}
err = f.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand All @@ -267,15 +271,15 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.exchange != nil {
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, f) // hash security
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -351,13 +355,19 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}
}

// also write in the blockstore for caching
// also write in the blockstore for caching, inform the exchange that the blocks are available
err = bs.PutMany(ctx, batch)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

err = f.NotifyNewBlocks(ctx, batch...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}

for _, b = range batch {
select {
case out <- b:
Expand Down Expand Up @@ -396,22 +406,23 @@ type Session struct {
lk sync.Mutex
}

func (s *Session) getSession() exchange.Fetcher {
func (s *Session) getSession() exchange.Interface {
s.lk.Lock()
defer s.lk.Unlock()
if s.ses == nil {
s.ses = s.sessEx.NewSession(s.sessCtx)
}

return s.ses
// TODO: don't do that
return s.ses.(exchange.Interface)
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.sessEx != nil {
f = s.getSession
}
Expand All @@ -423,7 +434,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.sessEx != nil {
f = s.getSession
}
Expand Down
110 changes: 71 additions & 39 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,48 +51,68 @@ func TestExchangeWrite(t *testing.T) {
0,
}
exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(exchbstore)
exch := &notifyCountingExchange{
offline.Exchange(exchbstore),
0,
}
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

// GetBlock
block := bgen.Next()
err := exchbstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}
got, err := bserv.GetBlock(context.Background(), block.Cid())
if err != nil {
t.Fatal(err)
}
if got.Cid() != block.Cid() {
t.Fatalf("GetBlock returned unexpected block")
}
if bstore.PutCounter != 1 {
t.Fatalf("expected one Put call, have: %d", bstore.PutCounter)
}

// GetBlocks
b1 := bgen.Next()
err = exchbstore.Put(context.Background(), b1)
if err != nil {
t.Fatal(err)
}
b2 := bgen.Next()
err = exchbstore.Put(context.Background(), b2)
if err != nil {
t.Fatal(err)
}
bchan := bserv.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()})
var gotBlocks []blocks.Block
for b := range bchan {
gotBlocks = append(gotBlocks, b)
}
if len(gotBlocks) != 2 {
t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks))
}
if bstore.PutCounter != 3 {
t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter)
for name, fetcher := range map[string]BlockGetter{
"blockservice": bserv,
"session": NewSession(context.Background(), bserv),
} {
t.Run(name, func(t *testing.T) {
// GetBlock
block := bgen.Next()
err := exchbstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}
got, err := fetcher.GetBlock(context.Background(), block.Cid())
if err != nil {
t.Fatal(err)
}
if got.Cid() != block.Cid() {
t.Fatalf("GetBlock returned unexpected block")
}
if bstore.PutCounter != 1 {
t.Fatalf("expected one Put call, have: %d", bstore.PutCounter)
}
if exch.notifyCount != 1 {
t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount)
}

// GetBlocks
b1 := bgen.Next()
err = exchbstore.Put(context.Background(), b1)
if err != nil {
t.Fatal(err)
}
b2 := bgen.Next()
err = exchbstore.Put(context.Background(), b2)
if err != nil {
t.Fatal(err)
}
bchan := fetcher.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()})
var gotBlocks []blocks.Block
for b := range bchan {
gotBlocks = append(gotBlocks, b)
}
if len(gotBlocks) != 2 {
t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks))
}
if bstore.PutCounter != 3 {
t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter)
}
if exch.notifyCount != 3 {
t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount)
}

// reset counts
bstore.PutCounter = 0
exch.notifyCount = 0
})
}
}

Expand Down Expand Up @@ -168,6 +188,18 @@ func (bs *PutCountingBlockstore) PutMany(ctx context.Context, blocks []blocks.Bl
return bs.Blockstore.PutMany(ctx, blocks)
}

var _ exchange.Interface = (*notifyCountingExchange)(nil)

type notifyCountingExchange struct {
exchange.Interface
notifyCount int
}

func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
n.notifyCount += len(blocks)
return n.Interface.NotifyNewBlocks(ctx, blocks...)
}

var _ exchange.SessionExchange = (*fakeSessionExchange)(nil)

type fakeSessionExchange struct {
Expand Down

0 comments on commit 8ee16bb

Please sign in to comment.