Skip to content

Commit

Permalink
add a SkipNext method on block reader (#338)
Browse files Browse the repository at this point in the history
* add a `SkipNext` method on block reader

Co-authored-by: Rod Vagg <rod@vagg.org>
  • Loading branch information
willscott and rvagg authored Oct 20, 2022
1 parent 02d658f commit 50e029b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 7 deletions.
97 changes: 95 additions & 2 deletions v2/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
internalio "github.com/ipld/go-car/v2/internal/io"
"github.com/multiformats/go-varint"
)

// BlockReader facilitates iteration over CAR blocks for both CARv1 and CARv2.
Expand All @@ -20,8 +21,10 @@ type BlockReader struct {
Roots []cid.Cid

// Used internally only, by BlockReader.Next during iteration over blocks.
r io.Reader
opts Options
r io.Reader
offset uint64
readerSize int64
opts Options
}

// NewBlockReader instantiates a new BlockReader facilitating iteration over blocks in CARv1 or
Expand Down Expand Up @@ -52,6 +55,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) {
// Simply populate br.Roots and br.r without modifying r.
br.Roots = pragmaOrV1Header.Roots
br.r = r
br.readerSize = -1
br.offset, _ = carv1.HeaderSize(pragmaOrV1Header)
case 2:
// If the version is 2:
// 1. Read CARv2 specific header to locate the inner CARv1 data payload offset and size.
Expand All @@ -75,6 +80,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) {
if _, err := rs.Seek(int64(v2h.DataOffset)-PragmaSize-HeaderSize, io.SeekCurrent); err != nil {
return nil, err
}
br.offset = uint64(v2h.DataOffset)
br.readerSize = int64(v2h.DataOffset + v2h.DataSize)

// Set br.r to a LimitReader reading from r limited to dataSize.
br.r = io.LimitReader(r, int64(v2h.DataSize))
Expand Down Expand Up @@ -122,5 +129,91 @@ func (br *BlockReader) Next() (blocks.Block, error) {
return nil, fmt.Errorf("mismatch in content integrity, expected: %s, got: %s", c, hashed)
}

ss := uint64(c.ByteLen()) + uint64(len(data))
br.offset += uint64(varint.UvarintSize(ss)) + ss
return blocks.NewBlockWithCid(data, c)
}

type BlockMetadata struct {
cid.Cid
Offset uint64
Size uint64
}

// SkipNext jumps over the next block, returning metadata about what it is (the CID, offset, and size).
// Like Next it will return an io.EOF once it has reached the end.
//
// If the underlying reader used by the BlockReader is actually a ReadSeeker, this method will attempt to
// seek over the underlying data rather than reading it into memory.
func (br *BlockReader) SkipNext() (*BlockMetadata, error) {
sctSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize)
if err != nil {
return nil, err
}

if sctSize == 0 {
_, _, err := cid.CidFromBytes([]byte{})
return nil, err
}

cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sctSize)))
if err != nil {
return nil, err
}

blkSize := sctSize - uint64(cidSize)
if brs, ok := br.r.(io.ReadSeeker); ok {
// carv1 and we don't know the size, so work it out and cache it
if br.readerSize == -1 {
cur, err := brs.Seek(0, io.SeekCurrent)
if err != nil {
return nil, err
}
end, err := brs.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
br.readerSize = end
if _, err = brs.Seek(cur, io.SeekStart); err != nil {
return nil, err
}
}
// seek.
finalOffset, err := brs.Seek(int64(blkSize), io.SeekCurrent)
if err != nil {
return nil, err
}
if finalOffset != int64(br.offset)+int64(sctSize)+int64(varint.UvarintSize(sctSize)) {
return nil, fmt.Errorf("unexpected length")
}
if finalOffset > br.readerSize {
return nil, io.ErrUnexpectedEOF
}
br.offset = uint64(finalOffset)
return &BlockMetadata{
c,
uint64(finalOffset) - sctSize - uint64(varint.UvarintSize(sctSize)),
blkSize,
}, nil
}

// read to end.
readCnt, err := io.CopyN(io.Discard, br.r, int64(blkSize))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
}
return nil, err
}
if readCnt != int64(blkSize) {
return nil, fmt.Errorf("unexpected length")
}
origOffset := br.offset
br.offset += uint64(varint.UvarintSize(sctSize)) + sctSize

return &BlockMetadata{
c,
origOffset,
blkSize,
}, nil
}
36 changes: 36 additions & 0 deletions v2/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package car_test
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"testing"
Expand Down Expand Up @@ -106,6 +107,41 @@ func TestBlockReader_WithCarV1Consistency(t *testing.T) {
}
}
})
t.Run(tt.name+"-skipping-reads", func(t *testing.T) {
r := requireReaderFromPath(t, tt.path)
subject, err := carv2.NewBlockReader(r, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF))
require.NoError(t, err)

require.Equal(t, tt.wantVersion, subject.Version)

var wantReader *carv1.CarReader
switch tt.wantVersion {
case 1:
wantReader = requireNewCarV1ReaderFromV1File(t, tt.path, tt.zerLenAsEOF)
case 2:
wantReader = requireNewCarV1ReaderFromV2File(t, tt.path, tt.zerLenAsEOF)
default:
require.Failf(t, "invalid test-case", "unknown wantVersion %v", tt.wantVersion)
}
require.Equal(t, wantReader.Header.Roots, subject.Roots)

for {
gotBlock, gotErr := subject.SkipNext()
wantBlock, wantErr := wantReader.Next()
if wantErr != nil && gotErr == nil {
fmt.Printf("want was %+v\n", wantReader)
fmt.Printf("want was err, got was %+v / %d\n", gotBlock, gotBlock.Size)
}
require.Equal(t, wantErr, gotErr)
if gotErr == io.EOF {
break
}
if gotErr == nil {
require.Equal(t, wantBlock.Cid(), gotBlock.Cid)
require.Equal(t, uint64(len(wantBlock.RawData())), gotBlock.Size)
}
}
})
}
}

Expand Down
18 changes: 13 additions & 5 deletions v2/internal/carv1/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,28 @@ func LdSize(d ...[]byte) uint64 {
return sum + uint64(s)
}

func LdRead(r io.Reader, zeroLenAsEOF bool, maxReadBytes uint64) ([]byte, error) {
func LdReadSize(r io.Reader, zeroLenAsEOF bool, maxReadBytes uint64) (uint64, error) {
l, err := varint.ReadUvarint(internalio.ToByteReader(r))
if err != nil {
// If the length of bytes read is non-zero when the error is EOF then signal an unclean EOF.
if l > 0 && err == io.EOF {
return nil, io.ErrUnexpectedEOF
return 0, io.ErrUnexpectedEOF
}
return nil, err
return 0, err
} else if l == 0 && zeroLenAsEOF {
return nil, io.EOF
return 0, io.EOF
}

if l > maxReadBytes { // Don't OOM
return nil, ErrSectionTooLarge
return 0, ErrSectionTooLarge
}
return l, nil
}

func LdRead(r io.Reader, zeroLenAsEOF bool, maxReadBytes uint64) ([]byte, error) {
l, err := LdReadSize(r, zeroLenAsEOF, maxReadBytes)
if err != nil {
return nil, err
}

buf := make([]byte, l)
Expand Down

0 comments on commit 50e029b

Please sign in to comment.