Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
blockservice should notify the exchange when caching blocks in GetBlo…
Browse files Browse the repository at this point in the history
…ck(s)
  • Loading branch information
MichaelMure committed Jul 27, 2022
1 parent 4a60416 commit f2a4f4f
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 50 deletions.
33 changes: 22 additions & 11 deletions blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,19 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.exchange != nil {
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, f) // hash security
}

func (s *blockService) getExchange() exchange.Fetcher {
func (s *blockService) getExchange() exchange.Interface {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) (blocks.Block, error) {
err := verifcid.ValidateCid(c) // hash security
if err != nil {
return nil, err
Expand All @@ -247,11 +247,15 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun
if err != nil {
return nil, err
}
// also write in the blockstore for caching
// also write in the blockstore for caching, inform the exchange that the block is available
err = bs.Put(ctx, blk)
if err != nil {
return nil, err
}
err = f.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand All @@ -267,15 +271,15 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.exchange != nil {
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, f) // hash security
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -351,13 +355,19 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}
}

// also write in the blockstore for caching
// also write in the blockstore for caching, inform the exchange that the blocks are available
err = bs.PutMany(ctx, batch)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

err = f.NotifyNewBlocks(ctx, batch...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}

for _, b = range batch {
select {
case out <- b:
Expand Down Expand Up @@ -396,22 +406,23 @@ type Session struct {
lk sync.Mutex
}

func (s *Session) getSession() exchange.Fetcher {
func (s *Session) getSession() exchange.Interface {
s.lk.Lock()
defer s.lk.Unlock()
if s.ses == nil {
s.ses = s.sessEx.NewSession(s.sessCtx)
}

return s.ses
// TODO: don't do that
return s.ses.(exchange.Interface)
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.sessEx != nil {
f = s.getSession
}
Expand All @@ -423,7 +434,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

var f func() exchange.Fetcher
var f func() exchange.Interface
if s.sessEx != nil {
f = s.getSession
}
Expand Down
110 changes: 71 additions & 39 deletions blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,48 +51,68 @@ func TestExchangeWrite(t *testing.T) {
0,
}
exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(exchbstore)
exch := &notifyCountingExchange{
offline.Exchange(exchbstore),
0,
}
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

// GetBlock
block := bgen.Next()
err := exchbstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}
got, err := bserv.GetBlock(context.Background(), block.Cid())
if err != nil {
t.Fatal(err)
}
if got.Cid() != block.Cid() {
t.Fatalf("GetBlock returned unexpected block")
}
if bstore.PutCounter != 1 {
t.Fatalf("expected one Put call, have: %d", bstore.PutCounter)
}

// GetBlocks
b1 := bgen.Next()
err = exchbstore.Put(context.Background(), b1)
if err != nil {
t.Fatal(err)
}
b2 := bgen.Next()
err = exchbstore.Put(context.Background(), b2)
if err != nil {
t.Fatal(err)
}
bchan := bserv.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()})
var gotBlocks []blocks.Block
for b := range bchan {
gotBlocks = append(gotBlocks, b)
}
if len(gotBlocks) != 2 {
t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks))
}
if bstore.PutCounter != 3 {
t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter)
for name, fetcher := range map[string]BlockGetter{
"blockservice": bserv,
"session": NewSession(context.Background(), bserv),
} {
t.Run(name, func(t *testing.T) {
// GetBlock
block := bgen.Next()
err := exchbstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}
got, err := fetcher.GetBlock(context.Background(), block.Cid())
if err != nil {
t.Fatal(err)
}
if got.Cid() != block.Cid() {
t.Fatalf("GetBlock returned unexpected block")
}
if bstore.PutCounter != 1 {
t.Fatalf("expected one Put call, have: %d", bstore.PutCounter)
}
if exch.notifyCount != 1 {
t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount)
}

// GetBlocks
b1 := bgen.Next()
err = exchbstore.Put(context.Background(), b1)
if err != nil {
t.Fatal(err)
}
b2 := bgen.Next()
err = exchbstore.Put(context.Background(), b2)
if err != nil {
t.Fatal(err)
}
bchan := fetcher.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()})
var gotBlocks []blocks.Block
for b := range bchan {
gotBlocks = append(gotBlocks, b)
}
if len(gotBlocks) != 2 {
t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks))
}
if bstore.PutCounter != 3 {
t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter)
}
if exch.notifyCount != 3 {
t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount)
}

// reset counts
bstore.PutCounter = 0
exch.notifyCount = 0
})
}
}

Expand Down Expand Up @@ -168,6 +188,18 @@ func (bs *PutCountingBlockstore) PutMany(ctx context.Context, blocks []blocks.Bl
return bs.Blockstore.PutMany(ctx, blocks)
}

var _ exchange.Interface = (*notifyCountingExchange)(nil)

type notifyCountingExchange struct {
exchange.Interface
notifyCount int
}

func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
n.notifyCount += len(blocks)
return n.Interface.NotifyNewBlocks(ctx, blocks...)
}

var _ exchange.SessionExchange = (*fakeSessionExchange)(nil)

type fakeSessionExchange struct {
Expand Down

0 comments on commit f2a4f4f

Please sign in to comment.