Skip to content

Commit

Permalink
feat: traversal.Config#OnBlockIn for ongoing block tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Sep 19, 2023
1 parent 444036b commit cf35f5a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
11 changes: 9 additions & 2 deletions traversal/traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
ExpectDuplicatesIn bool // Handles whether the incoming stream has duplicates
WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks
MaxBlocks uint64 // set a budget for the traversal
OnBlockIn func(uint64) // a callback whenever a block is read the incoming source, recording the number of bytes in the block data
}

// TraversalResult provides the results of a successful traversal. Byte counting
Expand Down Expand Up @@ -151,7 +152,7 @@ func (cfg Config) VerifyBlockStream(
bs BlockStream,
lsys linking.LinkSystem,
) (TraversalResult, error) {
bt := &writeTracker{}
bt := &writeTracker{onBlockIn: cfg.OnBlockIn}
lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys)
lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, bs, bt, lsys)
Expand Down Expand Up @@ -362,6 +363,8 @@ func readNextBlock(ctx context.Context, bs BlockStream, expected cid.Cid) ([]byt
}

type writeTracker struct {
onBlockIn func(uint64)

blocksIn uint64
blocksOut uint64
bytesIn uint64
Expand All @@ -370,7 +373,11 @@ type writeTracker struct {

func (bt *writeTracker) recordBlockIn(data []byte) {
bt.blocksIn++
bt.bytesIn += uint64(len(data))
bc := uint64(len(data))
bt.bytesIn += bc
if bt.onBlockIn != nil {
bt.onBlockIn(bc)
}
}

func (bt *writeTracker) recordBlockOut(data []byte) {
Expand Down
12 changes: 11 additions & 1 deletion traversal/traversal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,15 @@ func TestVerifyCar(t *testing.T) {
}

carStream, errorCh := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.expectErr != "", testCase.incomingHasDups, testCase.streamErr, testCase.carAsRawBlocks, testCase.carAsCIDv0)
result, err := testCase.cfg.VerifyCar(ctx, carStream, lsys)
cfg := testCase.cfg
var blockCount, byteCount uint64
if cfg.OnBlockIn == nil {
cfg.OnBlockIn = func(bytes uint64) {
blockCount++
byteCount += bytes
}
}
result, err := cfg.VerifyCar(ctx, carStream, lsys)

// read the rest of data
io.ReadAll(carStream)
Expand All @@ -825,6 +833,7 @@ func TestVerifyCar(t *testing.T) {
} else {
req.Equal(count(testCase.blocks), result.BlocksIn)
}
req.Equal(result.BlocksIn, blockCount)
if testCase.expectBlocksOut > 0 {
req.Equal(testCase.expectBlocksOut, result.BlocksOut)
} else {
Expand All @@ -835,6 +844,7 @@ func TestVerifyCar(t *testing.T) {
} else {
req.Equal(sizeOf(testCase.blocks), result.BytesIn)
}
req.Equal(result.BytesIn, byteCount)
if testCase.expectBytesOut > 0 {
req.Equal(testCase.expectBytesOut, result.BytesOut)
} else {
Expand Down

0 comments on commit cf35f5a

Please sign in to comment.