Skip to content

Commit

Permalink
Implement options to handle IDENTITY CIDs gracefully
Browse files Browse the repository at this point in the history
Implement two additional options that allow a CARv2 file to 1) include
IDENTNTIY CIDs, and 2) specify a maximum allowed CID length with default
of 2KiB as a sufficiently large default.

Configure ReadWrite blockstore to persist given blocks with IDENTITY
CIDs.

Introduce a new Characteristics filed that signalls whether an index in
a CAR file contains a full catalog of CIDs for backward compatibility
purposes. Note, this is a new addition and will need to be added to the
spec in a separate PR.

Relates to #215
  • Loading branch information
masih committed Sep 20, 2021
1 parent 23ca7db commit 023c842
Show file tree
Hide file tree
Showing 16 changed files with 396 additions and 111 deletions.
2 changes: 1 addition & 1 deletion v2/blockstore/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// * blockstore.Has will always return true.
// * blockstore.Get will always succeed, returning the multihash digest of the given CID.
// * blockstore.GetSize will always succeed, returning the multihash digest length of the given CID.
// * blockstore.Put and blockstore.PutMany will always succeed without performing any operation.
// * blockstore.Put and blockstore.PutMany will always succeed without performing any operation unless car.IncludeIdentityCIDs is enabled.
//
// See: https://pkg.go.dev/github.com/ipfs/go-ipfs-blockstore#NewIdStore
package blockstore
20 changes: 11 additions & 9 deletions v2/blockstore/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,48 +32,50 @@ func TestReadOnly(t *testing.T) {
name string
v1OrV2path string
opts []carv2.Option
v1r *carv1.CarReader
}{
{
"OpenedWithCarV1",
"../testdata/sample-v1.car",
[]carv2.Option{UseWholeCIDs(true)},
newV1ReaderFromV1File(t, "../testdata/sample-v1.car", false),
[]carv2.Option{UseWholeCIDs(true), carv2.IncludeIdentityCIDs(true)},
},
{
"OpenedWithCarV2",
"../testdata/sample-wrapped-v2.car",
[]carv2.Option{UseWholeCIDs(true)},
newV1ReaderFromV2File(t, "../testdata/sample-wrapped-v2.car", false),
[]carv2.Option{UseWholeCIDs(true), carv2.IncludeIdentityCIDs(true)},
},
{
"OpenedWithCarV1ZeroLenSection",
"../testdata/sample-v1-with-zero-len-section.car",
[]carv2.Option{UseWholeCIDs(true), carv2.ZeroLengthSectionAsEOF(true)},
newV1ReaderFromV1File(t, "../testdata/sample-v1-with-zero-len-section.car", true),
},
{
"OpenedWithAnotherCarV1ZeroLenSection",
"../testdata/sample-v1-with-zero-len-section2.car",
[]carv2.Option{UseWholeCIDs(true), carv2.ZeroLengthSectionAsEOF(true)},
newV1ReaderFromV1File(t, "../testdata/sample-v1-with-zero-len-section2.car", true),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
subject, err := OpenReadOnly(tt.v1OrV2path, tt.opts...)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })

f, err := os.Open(tt.v1OrV2path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })

reader, err := carv2.NewBlockReader(f, tt.opts...)
require.NoError(t, err)

// Assert roots match v1 payload.
wantRoots := tt.v1r.Header.Roots
wantRoots := reader.Roots
gotRoots, err := subject.Roots()
require.NoError(t, err)
require.Equal(t, wantRoots, gotRoots)

var wantCids []cid.Cid
for {
wantBlock, err := tt.v1r.Next()
wantBlock, err := reader.Next()
if err == io.EOF {
break
}
Expand Down
23 changes: 18 additions & 5 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,23 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
for _, bl := range blks {
c := bl.Cid()

// Check for IDENTITY CID. If IDENTITY, ignore and move to the next block.
if _, ok, err := isIdentity(c); err != nil {
return err
} else if ok {
continue
// If IncludeIdentityCIDs option is disabled then treat IDENTITY CIDs like IdStore.
if !b.opts.IncludeIdentityCIDs {
// Check for IDENTITY CID. If IDENTITY, ignore and move to the next block.
if _, ok, err := isIdentity(c); err != nil {
return err
} else if ok {
continue
}
}

// Check if its size is too big.
// If larger than maximum allowed size, return error.
// Note, we need to check this regardless of whether we have IDENTITY CID or not.
// Since multhihash codes other than IDENTITY can result in large digests.
cSize := uint64(len(c.Bytes()))
if cSize > b.opts.MaxIndexCidSize {
return &carv2.ErrCidTooLarge{MaxSize: b.opts.MaxIndexCidSize, CurrentSize: cSize}
}

if !b.opts.BlockstoreAllowDuplicatePuts {
Expand Down Expand Up @@ -351,6 +363,7 @@ func (b *ReadWrite) Finalize() error {

// TODO check if add index option is set and don't write the index then set index offset to zero.
b.header = b.header.WithDataSize(uint64(b.dataWriter.Position()))
b.header.Characteristics.SetFullyIndexed(b.opts.IncludeIdentityCIDs)

// Note that we can't use b.Close here, as that tries to grab the same
// mutex we're holding here.
Expand Down
181 changes: 171 additions & 10 deletions v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore_test

import (
"context"
"crypto/sha512"
"fmt"
"io"
"io/ioutil"
Expand All @@ -12,21 +13,19 @@ import (
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipfsblockstore "github.com/ipfs/go-ipfs-blockstore"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/ipfs/go-merkledag"

carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-car/v2/index"
"github.com/stretchr/testify/assert"

"github.com/ipld/go-car/v2/internal/carv1"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ipfsblockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car/v2/blockstore"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/internal/carv1"
)

var (
Expand Down Expand Up @@ -688,3 +687,165 @@ func TestReadWriteErrorAfterClose(t *testing.T) {
// in progress.
}
}

func TestOpenReadWrite_WritesIdentityCIDsWhenOptionIsEnabled(t *testing.T) {
path := filepath.Join(t.TempDir(), "readwrite-with-id-enabled.car")
subject, err := blockstore.OpenReadWrite(path, []cid.Cid{}, carv2.IncludeIdentityCIDs(true))
require.NoError(t, err)

data := []byte("fish")
idmh, err := multihash.Sum(data, multihash.IDENTITY, -1)
require.NoError(t, err)
idCid := cid.NewCidV1(uint64(multicodec.Raw), idmh)

idBlock, err := blocks.NewBlockWithCid(data, idCid)
require.NoError(t, err)
err = subject.Put(idBlock)
require.NoError(t, err)

has, err := subject.Has(idCid)
require.NoError(t, err)
require.True(t, has)

gotBlock, err := subject.Get(idCid)
require.NoError(t, err)
require.Equal(t, idBlock, gotBlock)

keysChan, err := subject.AllKeysChan(context.Background())
require.NoError(t, err)
var i int
for c := range keysChan {
i++
require.Equal(t, idCid, c)
}
require.Equal(t, 1, i)

err = subject.Finalize()
require.NoError(t, err)

// Assert resulting CAR file indeed has the IDENTITY block.
f, err := os.Open(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })

reader, err := carv2.NewBlockReader(f)
require.NoError(t, err)

gotBlock, err = reader.Next()
require.NoError(t, err)
require.Equal(t, idBlock, gotBlock)

next, err := reader.Next()
require.Equal(t, io.EOF, err)
require.Nil(t, next)

// Assert the id is indexed.
r, err := carv2.OpenReader(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, r.Close()) })
require.True(t, r.Header.HasIndex())

ir := r.IndexReader()
require.NotNil(t, ir)

gotIdx, err := index.ReadFrom(ir)
require.NoError(t, err)

// Determine expected offset as the length of header plus one
header, err := carv1.ReadHeader(r.DataReader())
require.NoError(t, err)
object, err := cbor.DumpObject(header)
require.NoError(t, err)
expectedOffset := len(object) + 1

// Assert index is iterable and has exactly one record with expected multihash and offset.
switch idx := gotIdx.(type) {
case index.IterableIndex:
var i int
err := idx.ForEach(func(mh multihash.Multihash, offset uint64) error {
i++
require.Equal(t, idmh, mh)
require.Equal(t, uint64(expectedOffset), offset)
return nil
})
require.NoError(t, err)
require.Equal(t, 1, i)
default:
require.Failf(t, "unexpected index type", "wanted %v but got %v", multicodec.CarMultihashIndexSorted, idx.Codec())
}
}

func TestOpenReadWrite_ErrorsWhenWritingTooLargeOfACid(t *testing.T) {
maxAllowedCidSize := uint64(2)
path := filepath.Join(t.TempDir(), "readwrite-with-id-enabled-too-large.car")
subject, err := blockstore.OpenReadWrite(path, []cid.Cid{}, carv2.MaxIndexCidSize(maxAllowedCidSize))
t.Cleanup(subject.Discard)
require.NoError(t, err)

data := []byte("monsterlobster")
mh, err := multihash.Sum(data, multihash.SHA2_256, -1)
require.NoError(t, err)
bigCid := cid.NewCidV1(uint64(multicodec.Raw), mh)
bigCidLen := uint64(bigCid.ByteLen())
require.True(t, bigCidLen > maxAllowedCidSize)

bigBlock, err := blocks.NewBlockWithCid(data, bigCid)
require.NoError(t, err)
err = subject.Put(bigBlock)
require.Equal(t, &carv2.ErrCidTooLarge{MaxSize: maxAllowedCidSize, CurrentSize: bigCidLen}, err)
}

func TestReadWrite_ReWritingCARv1WithIdentityCidIsIdenticalToOriginalWithOptionsEnabled(t *testing.T) {
originalCARv1Path := "../testdata/sample-v1.car"
originalCarV1, err := os.Open(originalCARv1Path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, originalCarV1.Close()) })

r, err := carv2.NewBlockReader(originalCarV1)
require.NoError(t, err)

path := filepath.Join(t.TempDir(), "readwrite-from-carv1-with-id-enabled.car")
subject, err := blockstore.OpenReadWrite(path, r.Roots, carv2.IncludeIdentityCIDs(true))
require.NoError(t, err)
var idCidCount int
for {
next, err := r.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
if next.Cid().Prefix().MhType == multihash.IDENTITY {
idCidCount++
}
err = subject.Put(next)
require.NoError(t, err)
}
require.NotZero(t, idCidCount)
err = subject.Finalize()
require.NoError(t, err)

v2r, err := carv2.OpenReader(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, v2r.Close()) })

// Assert characteristics bit is set.
require.True(t, v2r.Header.Characteristics.IsFullyIndexed())

// Assert original CARv1 and generated innter CARv1 payload have the same SHA512 hash
// Note, we hash instead of comparing bytes to avoid excessive memory usage when sample CARv1 is large.

hasher := sha512.New()
gotWritten, err := io.Copy(hasher, v2r.DataReader())
require.NoError(t, err)
gotSum := hasher.Sum(nil)

hasher.Reset()
_, err = originalCarV1.Seek(0, io.SeekStart)
require.NoError(t, err)
wantWritten, err := io.Copy(hasher, originalCarV1)
require.NoError(t, err)
wantSum := hasher.Sum(nil)

require.Equal(t, wantWritten, gotWritten)
require.Equal(t, wantSum, gotSum)
}
34 changes: 34 additions & 0 deletions v2/car.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type (
}
)

// fullyIndexedCharPos is the position of Characteristics.Hi bit that specifies whether the index is a catalog af all CIDs or not.
const fullyIndexedCharPos = 7 // left-most bit

// WriteTo writes this characteristics to the given w.
func (c Characteristics) WriteTo(w io.Writer) (n int64, err error) {
buf := make([]byte, 16)
Expand All @@ -64,6 +67,37 @@ func (c *Characteristics) ReadFrom(r io.Reader) (int64, error) {
return n, nil
}

// IsFullyIndexed specifies whether the index of CARv2 represents a catalog of all CID segments.
// See IncludeIdentityCIDs
func (c *Characteristics) IsFullyIndexed() bool {
return isBitSet(c.Hi, fullyIndexedCharPos)
}

// SetFullyIndexed sets whether of CARv2 represents a catalog of all CID segments.
func (c *Characteristics) SetFullyIndexed(b bool) {
if b {
c.Hi = setBit(c.Hi, fullyIndexedCharPos)
} else {
c.Hi = unsetBit(c.Hi, fullyIndexedCharPos)
}
}

func setBit(n uint64, pos uint) uint64 {
n |= 1 << pos
return n
}

func unsetBit(n uint64, pos uint) uint64 {
mask := uint64(^(1 << pos))
n &= mask
return n
}

func isBitSet(n uint64, pos uint) bool {
bit := n & (1 << pos)
return bit > 0
}

// NewHeader instantiates a new CARv2 header, given the data size.
func NewHeader(dataSize uint64) Header {
header := Header{
Expand Down
Loading

0 comments on commit 023c842

Please sign in to comment.