From 8ee16bb46719f767ee357b1d7bd66207d5225bb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Wed, 27 Jul 2022 19:21:20 +0200 Subject: [PATCH] blockservice should notify the exchange when caching blocks in GetBlock(s) This commit was moved from ipfs/go-blockservice@f2a4f4f21dfc5f9ff7d4abb3c1f96368fa707600 --- blockservice/blockservice.go | 33 ++++++--- blockservice/blockservice_test.go | 110 +++++++++++++++++++----------- 2 files changed, 93 insertions(+), 50 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 134de222b..015f81a06 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -214,7 +214,7 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() exchange.Fetcher + var f func() exchange.Interface if s.exchange != nil { f = s.getExchange } @@ -222,11 +222,11 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e return getBlock(ctx, c, s.blockstore, f) // hash security } -func (s *blockService) getExchange() exchange.Fetcher { +func (s *blockService) getExchange() exchange.Interface { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) (blocks.Block, error) { +func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) (blocks.Block, error) { err := verifcid.ValidateCid(c) // hash security if err != nil { return nil, err @@ -247,11 +247,15 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun if err != nil { return nil, err } - // also write in the blockstore for caching + // also write in the blockstore for caching, inform the exchange that the block is available err = bs.Put(ctx, blk) if err != nil { return nil, err } + err = f.NotifyNewBlocks(ctx, blk) + if err != nil { + return nil, err + } logger.Debugf("BlockService.BlockFetched %s", c) return blk, nil } @@ -267,7 +271,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - var f func() exchange.Fetcher + var f func() exchange.Interface if s.exchange != nil { f = s.getExchange } @@ -275,7 +279,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block return getBlocks(ctx, ks, s.blockstore, f) // hash security } -func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) <-chan blocks.Block { +func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) <-chan blocks.Block { out := make(chan blocks.Block) go func() { @@ -351,13 +355,19 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget } } - // also write in the blockstore for caching + // also write in the blockstore for caching, inform the exchange that the blocks are available err = bs.PutMany(ctx, batch) if err != nil { logger.Errorf("could not write blocks from the network to the blockstore: %s", err) return } + err = f.NotifyNewBlocks(ctx, batch...) + if err != nil { + logger.Errorf("could not tell the exchange about new blocks: %s", err) + return + } + for _, b = range batch { select { case out <- b: @@ -396,14 +406,15 @@ type Session struct { lk sync.Mutex } -func (s *Session) getSession() exchange.Fetcher { +func (s *Session) getSession() exchange.Interface { s.lk.Lock() defer s.lk.Unlock() if s.ses == nil { s.ses = s.sessEx.NewSession(s.sessCtx) } - return s.ses + // TODO: don't do that + return s.ses.(exchange.Interface) } // GetBlock gets a block in the context of a request session @@ -411,7 +422,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() exchange.Fetcher + var f func() exchange.Interface if s.sessEx != nil { f = s.getSession } @@ -423,7 +434,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - var f func() exchange.Fetcher + var f func() exchange.Interface if s.sessEx != nil { f = s.getSession } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 5753d3a9d..846ae7169 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -51,48 +51,68 @@ func TestExchangeWrite(t *testing.T) { 0, } exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - exch := offline.Exchange(exchbstore) + exch := ¬ifyCountingExchange{ + offline.Exchange(exchbstore), + 0, + } bserv := NewWriteThrough(bstore, exch) bgen := butil.NewBlockGenerator() - // GetBlock - block := bgen.Next() - err := exchbstore.Put(context.Background(), block) - if err != nil { - t.Fatal(err) - } - got, err := bserv.GetBlock(context.Background(), block.Cid()) - if err != nil { - t.Fatal(err) - } - if got.Cid() != block.Cid() { - t.Fatalf("GetBlock returned unexpected block") - } - if bstore.PutCounter != 1 { - t.Fatalf("expected one Put call, have: %d", bstore.PutCounter) - } - - // GetBlocks - b1 := bgen.Next() - err = exchbstore.Put(context.Background(), b1) - if err != nil { - t.Fatal(err) - } - b2 := bgen.Next() - err = exchbstore.Put(context.Background(), b2) - if err != nil { - t.Fatal(err) - } - bchan := bserv.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()}) - var gotBlocks []blocks.Block - for b := range bchan { - gotBlocks = append(gotBlocks, b) - } - if len(gotBlocks) != 2 { - t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks)) - } - if bstore.PutCounter != 3 { - t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter) + for name, fetcher := range map[string]BlockGetter{ + "blockservice": bserv, + "session": NewSession(context.Background(), bserv), + } { + t.Run(name, func(t *testing.T) { + // GetBlock + block := bgen.Next() + err := exchbstore.Put(context.Background(), block) + if err != nil { + t.Fatal(err) + } + got, err := fetcher.GetBlock(context.Background(), block.Cid()) + if err != nil { + t.Fatal(err) + } + if got.Cid() != block.Cid() { + t.Fatalf("GetBlock returned unexpected block") + } + if bstore.PutCounter != 1 { + t.Fatalf("expected one Put call, have: %d", bstore.PutCounter) + } + if exch.notifyCount != 1 { + t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount) + } + + // GetBlocks + b1 := bgen.Next() + err = exchbstore.Put(context.Background(), b1) + if err != nil { + t.Fatal(err) + } + b2 := bgen.Next() + err = exchbstore.Put(context.Background(), b2) + if err != nil { + t.Fatal(err) + } + bchan := fetcher.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()}) + var gotBlocks []blocks.Block + for b := range bchan { + gotBlocks = append(gotBlocks, b) + } + if len(gotBlocks) != 2 { + t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks)) + } + if bstore.PutCounter != 3 { + t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter) + } + if exch.notifyCount != 3 { + t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount) + } + + // reset counts + bstore.PutCounter = 0 + exch.notifyCount = 0 + }) } } @@ -168,6 +188,18 @@ func (bs *PutCountingBlockstore) PutMany(ctx context.Context, blocks []blocks.Bl return bs.Blockstore.PutMany(ctx, blocks) } +var _ exchange.Interface = (*notifyCountingExchange)(nil) + +type notifyCountingExchange struct { + exchange.Interface + notifyCount int +} + +func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + n.notifyCount += len(blocks) + return n.Interface.NotifyNewBlocks(ctx, blocks...) +} + var _ exchange.SessionExchange = (*fakeSessionExchange)(nil) type fakeSessionExchange struct {