From 857050ab28044168654867a184daca953f2cbb5b Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 8 Oct 2020 22:49:36 +0200 Subject: [PATCH] Optimize chain and message sync Signed-off-by: Jakub Sztandera --- chain/sub/incoming.go | 17 +++++----- chain/sub/incoming_test.go | 63 ++++++++++++++++++++++++++++++++++++++ chain/sync.go | 6 ++++ 3 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 chain/sub/incoming_test.go diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 07b3343d24c..d51c481d18a 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -172,25 +172,24 @@ func fetchCids( cids []cid.Cid, cb func(int, blocks.Block) error, ) error { - fetchedBlocks := bserv.GetBlocks(ctx, cids) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() cidIndex := make(map[cid.Cid]int) for i, c := range cids { cidIndex[c] = i } + if len(cids) != len(cidIndex) { + return fmt.Errorf("duplicate CIDs in fetchCids input") + } + + fetchedBlocks := bserv.GetBlocks(ctx, cids) for i := 0; i < len(cids); i++ { select { case block, ok := <-fetchedBlocks: if !ok { - // Closed channel, no more blocks fetched, check if we have all - // of the CIDs requested. - // FIXME: Review this check. We don't call the callback on the - // last index? - if i == len(cids)-1 { - break - } - return fmt.Errorf("failed to fetch all messages") } diff --git a/chain/sub/incoming_test.go b/chain/sub/incoming_test.go new file mode 100644 index 00000000000..21543920919 --- /dev/null +++ b/chain/sub/incoming_test.go @@ -0,0 +1,63 @@ +package sub + +import ( + "context" + "testing" + + address "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +type getter struct { + msgs []*types.Message +} + +func (g *getter) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { panic("NYI") } + +func (g *getter) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + ch := make(chan blocks.Block, len(g.msgs)) + for _, m := range g.msgs { + by, err := m.Serialize() + if err != nil { + panic(err) + } + b, err := blocks.NewBlockWithCid(by, m.Cid()) + if err != nil { + panic(err) + } + ch <- b + } + close(ch) + return ch +} + +func TestFetchCidsWithDedup(t *testing.T) { + msgs := []*types.Message{} + for i := 0; i < 10; i++ { + msgs = append(msgs, &types.Message{ + From: address.TestAddress, + To: address.TestAddress, + + Nonce: uint64(i), + }) + } + cids := []cid.Cid{} + for _, m := range msgs { + cids = append(cids, m.Cid()) + } + g := &getter{msgs} + + // the cids have a duplicate + res, err := FetchMessagesByCids(context.TODO(), g, append(cids, cids[0])) + + t.Logf("err: %+v", err) + t.Logf("res: %+v", res) + if err == nil { + t.Errorf("there should be an error") + } + if err == nil && (res[0] == nil || res[len(res)-1] == nil) { + t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1]) + } +} diff --git a/chain/sync.go b/chain/sync.go index 240d1edef36..126c7300ea8 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -217,6 +217,12 @@ func (syncer *Syncer) Stop() { // This should be called when connecting to new peers, and additionally // when receiving new blocks from the network func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { + defer func() { + if err := recover(); err != nil { + log.Errorf("panic in InformNewHead: ", err) + } + }() + ctx := context.Background() if fts == nil { log.Errorf("got nil tipset in InformNewHead")