diff --git a/v2/block_reader.go b/v2/block_reader.go index 252885c3..0c38412f 100644 --- a/v2/block_reader.go +++ b/v2/block_reader.go @@ -9,6 +9,7 @@ import ( "github.com/ipld/go-car/v2/internal/carv1" "github.com/ipld/go-car/v2/internal/carv1/util" internalio "github.com/ipld/go-car/v2/internal/io" + "github.com/multiformats/go-varint" ) // BlockReader facilitates iteration over CAR blocks for both CARv1 and CARv2. @@ -20,8 +21,10 @@ type BlockReader struct { Roots []cid.Cid // Used internally only, by BlockReader.Next during iteration over blocks. - r io.Reader - opts Options + r io.Reader + offset uint64 + readerSize int64 + opts Options } // NewBlockReader instantiates a new BlockReader facilitating iteration over blocks in CARv1 or @@ -52,6 +55,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) { // Simply populate br.Roots and br.r without modifying r. br.Roots = pragmaOrV1Header.Roots br.r = r + br.readerSize = -1 + br.offset, _ = carv1.HeaderSize(pragmaOrV1Header) case 2: // If the version is 2: // 1. Read CARv2 specific header to locate the inner CARv1 data payload offset and size. @@ -75,6 +80,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) { if _, err := rs.Seek(int64(v2h.DataOffset)-PragmaSize-HeaderSize, io.SeekCurrent); err != nil { return nil, err } + br.offset = uint64(v2h.DataOffset) + br.readerSize = int64(v2h.DataOffset + v2h.DataSize) // Set br.r to a LimitReader reading from r limited to dataSize. br.r = io.LimitReader(r, int64(v2h.DataSize)) @@ -122,5 +129,91 @@ func (br *BlockReader) Next() (blocks.Block, error) { return nil, fmt.Errorf("mismatch in content integrity, expected: %s, got: %s", c, hashed) } + ss := uint64(c.ByteLen()) + uint64(len(data)) + br.offset += uint64(varint.UvarintSize(ss)) + ss return blocks.NewBlockWithCid(data, c) } + +type BlockMetadata struct { + cid.Cid + Offset uint64 + Size uint64 +} + +// SkipNext jumps over the next block, returning metadata about what it is (the CID, offset, and size). +// Like Next it will return an io.EOF once it has reached the end. +// +// If the underlying reader used by the BlockReader is actually a ReadSeeker, this method will attempt to +// seek over the underlying data rather than reading it into memory. +func (br *BlockReader) SkipNext() (*BlockMetadata, error) { + sctSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize) + if err != nil { + return nil, err + } + + if sctSize == 0 { + _, _, err := cid.CidFromBytes([]byte{}) + return nil, err + } + + cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sctSize))) + if err != nil { + return nil, err + } + + blkSize := sctSize - uint64(cidSize) + if brs, ok := br.r.(io.ReadSeeker); ok { + // carv1 and we don't know the size, so work it out and cache it + if br.readerSize == -1 { + cur, err := brs.Seek(0, io.SeekCurrent) + if err != nil { + return nil, err + } + end, err := brs.Seek(0, io.SeekEnd) + if err != nil { + return nil, err + } + br.readerSize = end + if _, err = brs.Seek(cur, io.SeekStart); err != nil { + return nil, err + } + } + // seek. + finalOffset, err := brs.Seek(int64(blkSize), io.SeekCurrent) + if err != nil { + return nil, err + } + if finalOffset != int64(br.offset)+int64(sctSize)+int64(varint.UvarintSize(sctSize)) { + return nil, fmt.Errorf("unexpected length") + } + if finalOffset > br.readerSize { + return nil, io.ErrUnexpectedEOF + } + br.offset = uint64(finalOffset) + return &BlockMetadata{ + c, + uint64(finalOffset) - sctSize - uint64(varint.UvarintSize(sctSize)), + blkSize, + }, nil + } + + // read to end. + readCnt, err := io.CopyN(io.Discard, br.r, int64(blkSize)) + if err != nil { + if err == io.EOF { + return nil, io.ErrUnexpectedEOF + } + return nil, err + } + if readCnt != int64(blkSize) { + return nil, fmt.Errorf("unexpected length") + } + origOffset := br.offset + br.offset += uint64(varint.UvarintSize(sctSize)) + sctSize + + return &BlockMetadata{ + c, + origOffset, + blkSize, + }, nil +} diff --git a/v2/block_reader_test.go b/v2/block_reader_test.go index 384a6ed8..b5958c7f 100644 --- a/v2/block_reader_test.go +++ b/v2/block_reader_test.go @@ -3,6 +3,7 @@ package car_test import ( "bytes" "encoding/hex" + "fmt" "io" "os" "testing" @@ -106,6 +107,41 @@ func TestBlockReader_WithCarV1Consistency(t *testing.T) { } } }) + t.Run(tt.name+"-skipping-reads", func(t *testing.T) { + r := requireReaderFromPath(t, tt.path) + subject, err := carv2.NewBlockReader(r, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF)) + require.NoError(t, err) + + require.Equal(t, tt.wantVersion, subject.Version) + + var wantReader *carv1.CarReader + switch tt.wantVersion { + case 1: + wantReader = requireNewCarV1ReaderFromV1File(t, tt.path, tt.zerLenAsEOF) + case 2: + wantReader = requireNewCarV1ReaderFromV2File(t, tt.path, tt.zerLenAsEOF) + default: + require.Failf(t, "invalid test-case", "unknown wantVersion %v", tt.wantVersion) + } + require.Equal(t, wantReader.Header.Roots, subject.Roots) + + for { + gotBlock, gotErr := subject.SkipNext() + wantBlock, wantErr := wantReader.Next() + if wantErr != nil && gotErr == nil { + fmt.Printf("want was %+v\n", wantReader) + fmt.Printf("want was err, got was %+v / %d\n", gotBlock, gotBlock.Size) + } + require.Equal(t, wantErr, gotErr) + if gotErr == io.EOF { + break + } + if gotErr == nil { + require.Equal(t, wantBlock.Cid(), gotBlock.Cid) + require.Equal(t, uint64(len(wantBlock.RawData())), gotBlock.Size) + } + } + }) } } diff --git a/v2/internal/carv1/util/util.go b/v2/internal/carv1/util/util.go index 7963812e..00cd885b 100644 --- a/v2/internal/carv1/util/util.go +++ b/v2/internal/carv1/util/util.go @@ -65,20 +65,28 @@ func LdSize(d ...[]byte) uint64 { return sum + uint64(s) } -func LdRead(r io.Reader, zeroLenAsEOF bool, maxReadBytes uint64) ([]byte, error) { +func LdReadSize(r io.Reader, zeroLenAsEOF bool, maxReadBytes uint64) (uint64, error) { l, err := varint.ReadUvarint(internalio.ToByteReader(r)) if err != nil { // If the length of bytes read is non-zero when the error is EOF then signal an unclean EOF. if l > 0 && err == io.EOF { - return nil, io.ErrUnexpectedEOF + return 0, io.ErrUnexpectedEOF } - return nil, err + return 0, err } else if l == 0 && zeroLenAsEOF { - return nil, io.EOF + return 0, io.EOF } if l > maxReadBytes { // Don't OOM - return nil, ErrSectionTooLarge + return 0, ErrSectionTooLarge + } + return l, nil +} + +func LdRead(r io.Reader, zeroLenAsEOF bool, maxReadBytes uint64) ([]byte, error) { + l, err := LdReadSize(r, zeroLenAsEOF, maxReadBytes) + if err != nil { + return nil, err } buf := make([]byte, l)