diff --git a/v2/go.mod b/v2/go.mod index c4af1339..d3493d63 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-merkledag v0.3.2 github.com/klauspost/cpuid/v2 v2.0.8 // indirect github.com/mattn/go-colorable v0.1.8 // indirect - github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae + github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61 github.com/multiformats/go-multihash v0.0.15 github.com/multiformats/go-varint v0.0.6 github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 diff --git a/v2/go.sum b/v2/go.sum index dd4fca37..91b8b7ef 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -374,8 +374,8 @@ github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= -github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae h1:wfljHPpiR0UDOjeqld9ds0Zxl3Nt/j+0wnvyBc01JgY= -github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ= +github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61 h1:ZrUuMKNgJ52qHPoQ+bx0h0uBfcWmN7Px+4uKSZeesiI= +github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= diff --git a/v2/index/index.go b/v2/index/index.go index 3408dfbd..cc9ff70b 100644 --- a/v2/index/index.go +++ b/v2/index/index.go @@ -72,6 +72,8 @@ func New(codec multicodec.Code) (Index, error) { switch codec { case multicodec.CarIndexSorted: return newSorted(), nil + case multicodec.CarMultihashIndexSorted: + return newMultihashSorted(), nil default: return nil, fmt.Errorf("unknwon index codec: %v", codec) } diff --git a/v2/index/mhindexsorted.go b/v2/index/mhindexsorted.go new file mode 100644 index 00000000..95ab6474 --- /dev/null +++ b/v2/index/mhindexsorted.go @@ -0,0 +1,158 @@ +package index + +import ( + "encoding/binary" + "io" + "sort" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multihash" +) + +type ( + // multihashIndexSorted maps multihash code (i.e. hashing algorithm) to multiWidthCodedIndex. + // This index ignores any Record with multihash.IDENTITY. + multihashIndexSorted map[uint64]*multiWidthCodedIndex + // multiWidthCodedIndex stores multihash code for each multiWidthIndex. + multiWidthCodedIndex struct { + multiWidthIndex + code uint64 + } +) + +func newMultiWidthCodedIndex() *multiWidthCodedIndex { + return &multiWidthCodedIndex{ + multiWidthIndex: make(multiWidthIndex), + } +} + +func (m *multiWidthCodedIndex) Marshal(w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, m.code); err != nil { + return err + } + return m.multiWidthIndex.Marshal(w) +} + +func (m *multiWidthCodedIndex) Unmarshal(r io.Reader) error { + if err := binary.Read(r, binary.LittleEndian, &m.code); err != nil { + return err + } + return m.multiWidthIndex.Unmarshal(r) +} + +func (m *multihashIndexSorted) Codec() multicodec.Code { + return multicodec.CarMultihashIndexSorted +} + +func (m *multihashIndexSorted) Marshal(w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil { + return err + } + // The codes are unique, but ranging over a map isn't deterministic. + // As per the CARv2 spec, we must order buckets by digest length. + // TODO update CARv2 spec to reflect this for the new index type. + codes := m.sortedMultihashCodes() + + for _, code := range codes { + mwci := (*m)[code] + if err := mwci.Marshal(w); err != nil { + return err + } + } + return nil +} + +func (m *multihashIndexSorted) sortedMultihashCodes() []uint64 { + codes := make([]uint64, 0, len(*m)) + for code := range *m { + codes = append(codes, code) + } + sort.Slice(codes, func(i, j int) bool { + return codes[i] < codes[j] + }) + return codes +} + +func (m *multihashIndexSorted) Unmarshal(r io.Reader) error { + var l int32 + if err := binary.Read(r, binary.LittleEndian, &l); err != nil { + return err + } + for i := 0; i < int(l); i++ { + mwci := newMultiWidthCodedIndex() + if err := mwci.Unmarshal(r); err != nil { + return err + } + m.put(mwci) + } + return nil +} + +func (m *multihashIndexSorted) put(mwci *multiWidthCodedIndex) { + (*m)[mwci.code] = mwci +} + +func (m *multihashIndexSorted) Load(records []Record) error { + // TODO optimize load by avoiding multihash decoding twice. + // This implementation decodes multihashes twice: once here to group by code, and once in the + // internals of multiWidthIndex to group by digest length. The code can be optimized by + // combining the grouping logic into one step where the multihash of a CID is only decoded once. + // The optimization would need refactoring of the IndexSorted compaction logic. + + // Group records by multihash code + byCode := make(map[uint64][]Record) + for _, record := range records { + dmh, err := multihash.Decode(record.Hash()) + if err != nil { + return err + } + code := dmh.Code + // Ignore IDENTITY multihash in the index. + if code == multihash.IDENTITY { + continue + } + recsByCode, ok := byCode[code] + if !ok { + recsByCode = make([]Record, 0) + byCode[code] = recsByCode + } + byCode[code] = append(recsByCode, record) + } + + // Load each record group. + for code, recsByCode := range byCode { + mwci := newMultiWidthCodedIndex() + mwci.code = code + if err := mwci.Load(recsByCode); err != nil { + return err + } + m.put(mwci) + } + return nil +} + +func (m *multihashIndexSorted) GetAll(cid cid.Cid, f func(uint64) bool) error { + hash := cid.Hash() + dmh, err := multihash.Decode(hash) + if err != nil { + return err + } + mwci, err := m.get(dmh) + if err != nil { + return err + } + return mwci.GetAll(cid, f) +} + +func (m *multihashIndexSorted) get(dmh *multihash.DecodedMultihash) (*multiWidthCodedIndex, error) { + if codedIdx, ok := (*m)[dmh.Code]; ok { + return codedIdx, nil + } + return nil, ErrNotFound +} + +func newMultihashSorted() Index { + index := make(multihashIndexSorted) + return &index +} diff --git a/v2/index/mhindexsorted_test.go b/v2/index/mhindexsorted_test.go new file mode 100644 index 00000000..ced8a921 --- /dev/null +++ b/v2/index/mhindexsorted_test.go @@ -0,0 +1,107 @@ +package index_test + +import ( + "bytes" + "fmt" + "math/rand" + "testing" + + "github.com/multiformats/go-multicodec" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2/index" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestMutilhashSortedIndex_Codec(t *testing.T) { + subject, err := index.New(multicodec.CarMultihashIndexSorted) + require.NoError(t, err) + require.Equal(t, multicodec.CarMultihashIndexSorted, subject.Codec()) +} + +func TestMultiWidthCodedIndex_LoadDoesNotLoadIdentityMultihash(t *testing.T) { + rng := rand.New(rand.NewSource(1413)) + identityRecords := generateIndexRecords(t, multihash.IDENTITY, rng) + nonIdentityRecords := generateIndexRecords(t, multihash.SHA2_256, rng) + records := append(identityRecords, nonIdentityRecords...) + + subject, err := index.New(multicodec.CarMultihashIndexSorted) + require.NoError(t, err) + err = subject.Load(records) + require.NoError(t, err) + + // Assert index does not contain any records with IDENTITY multihash code. + for _, r := range identityRecords { + wantCid := r.Cid + err = subject.GetAll(wantCid, func(o uint64) bool { + require.Fail(t, "subject should not contain any records with IDENTITY multihash code") + return false + }) + require.Equal(t, index.ErrNotFound, err) + } + + // Assert however, index does contain the non IDENTITY records. + requireContainsAll(t, subject, nonIdentityRecords) +} + +func TestMultiWidthCodedIndex_MarshalUnmarshal(t *testing.T) { + rng := rand.New(rand.NewSource(1413)) + records := generateIndexRecords(t, multihash.SHA2_256, rng) + + // Create a new mh sorted index and load randomly generated records into it. + subject, err := index.New(multicodec.CarMultihashIndexSorted) + require.NoError(t, err) + err = subject.Load(records) + require.NoError(t, err) + + // Marshal the index. + buf := new(bytes.Buffer) + err = subject.Marshal(buf) + require.NoError(t, err) + + // Unmarshal it back to another instance of mh sorted index. + umSubject, err := index.New(multicodec.CarMultihashIndexSorted) + require.NoError(t, err) + err = umSubject.Unmarshal(buf) + require.NoError(t, err) + + // Assert original records are present in both index instances with expected offset. + requireContainsAll(t, subject, records) + requireContainsAll(t, umSubject, records) +} + +func generateIndexRecords(t *testing.T, hasherCode uint64, rng *rand.Rand) []index.Record { + var records []index.Record + recordCount := rng.Intn(99) + 1 // Up to 100 records + for i := 0; i < recordCount; i++ { + records = append(records, index.Record{ + Cid: generateCidV1(t, hasherCode, rng), + Offset: rng.Uint64(), + }) + } + return records +} + +func generateCidV1(t *testing.T, hasherCode uint64, rng *rand.Rand) cid.Cid { + data := []byte(fmt.Sprintf("🌊d-%d", rng.Uint64())) + mh, err := multihash.Sum(data, hasherCode, -1) + require.NoError(t, err) + return cid.NewCidV1(cid.Raw, mh) +} + +func requireContainsAll(t *testing.T, subject index.Index, nonIdentityRecords []index.Record) { + for _, r := range nonIdentityRecords { + wantCid := r.Cid + wantOffset := r.Offset + + var gotOffsets []uint64 + err := subject.GetAll(wantCid, func(o uint64) bool { + gotOffsets = append(gotOffsets, o) + return false + }) + require.NoError(t, err) + require.Equal(t, 1, len(gotOffsets)) + require.Equal(t, wantOffset, gotOffsets[0]) + } +} diff --git a/v2/index_gen_test.go b/v2/index_gen_test.go index 058d07e6..635a07cb 100644 --- a/v2/index_gen_test.go +++ b/v2/index_gen_test.go @@ -1,9 +1,19 @@ -package car +package car_test import ( + "io" "os" "testing" + "github.com/multiformats/go-multihash" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/internal/carv1" + internalio "github.com/ipld/go-car/v2/internal/io" + "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-varint" + "github.com/ipld/go-car/v2/index" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -13,19 +23,19 @@ func TestReadOrGenerateIndex(t *testing.T) { tests := []struct { name string carPath string - readOpts []ReadOption + readOpts []carv2.ReadOption wantIndexer func(t *testing.T) index.Index wantErr bool }{ { "CarV1IsIndexedAsExpected", "testdata/sample-v1.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { v1, err := os.Open("testdata/sample-v1.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1) + want, err := carv2.GenerateIndex(v1) require.NoError(t, err) return want }, @@ -34,12 +44,12 @@ func TestReadOrGenerateIndex(t *testing.T) { { "CarV2WithIndexIsReturnedAsExpected", "testdata/sample-wrapped-v2.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { v2, err := os.Open("testdata/sample-wrapped-v2.car") require.NoError(t, err) defer v2.Close() - reader, err := NewReader(v2) + reader, err := carv2.NewReader(v2) require.NoError(t, err) want, err := index.ReadFrom(reader.IndexReader()) require.NoError(t, err) @@ -50,12 +60,12 @@ func TestReadOrGenerateIndex(t *testing.T) { { "CarV1WithZeroLenSectionIsGeneratedAsExpected", "testdata/sample-v1-with-zero-len-section.car", - []ReadOption{ZeroLengthSectionAsEOF(true)}, + []carv2.ReadOption{carv2.ZeroLengthSectionAsEOF(true)}, func(t *testing.T) index.Index { v1, err := os.Open("testdata/sample-v1-with-zero-len-section.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1, ZeroLengthSectionAsEOF(true)) + want, err := carv2.GenerateIndex(v1, carv2.ZeroLengthSectionAsEOF(true)) require.NoError(t, err) return want }, @@ -64,12 +74,12 @@ func TestReadOrGenerateIndex(t *testing.T) { { "AnotherCarV1WithZeroLenSectionIsGeneratedAsExpected", "testdata/sample-v1-with-zero-len-section2.car", - []ReadOption{ZeroLengthSectionAsEOF(true)}, + []carv2.ReadOption{carv2.ZeroLengthSectionAsEOF(true)}, func(t *testing.T) index.Index { v1, err := os.Open("testdata/sample-v1-with-zero-len-section2.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1, ZeroLengthSectionAsEOF(true)) + want, err := carv2.GenerateIndex(v1, carv2.ZeroLengthSectionAsEOF(true)) require.NoError(t, err) return want }, @@ -78,14 +88,14 @@ func TestReadOrGenerateIndex(t *testing.T) { { "CarV1WithZeroLenSectionWithoutOptionIsError", "testdata/sample-v1-with-zero-len-section.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { return nil }, true, }, { "CarOtherThanV1OrV2IsError", "testdata/sample-rootless-v42.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { return nil }, true, }, @@ -95,7 +105,7 @@ func TestReadOrGenerateIndex(t *testing.T) { carFile, err := os.Open(tt.carPath) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, carFile.Close()) }) - got, err := ReadOrGenerateIndex(carFile, tt.readOpts...) + got, err := carv2.ReadOrGenerateIndex(carFile, tt.readOpts...) if tt.wantErr { require.Error(t, err) } else { @@ -121,7 +131,7 @@ func TestGenerateIndexFromFile(t *testing.T) { v1, err := os.Open("testdata/sample-v1.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1) + want, err := carv2.GenerateIndex(v1) require.NoError(t, err) return want }, @@ -142,7 +152,7 @@ func TestGenerateIndexFromFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := GenerateIndexFromFile(tt.carPath) + got, err := carv2.GenerateIndexFromFile(tt.carPath) if tt.wantErr { require.Error(t, err) } else { @@ -153,3 +163,91 @@ func TestGenerateIndexFromFile(t *testing.T) { }) } } + +func TestMultihashIndexSortedConsistencyWithIndexSorted(t *testing.T) { + path := "testdata/sample-v1.car" + + sortedIndex, err := carv2.GenerateIndexFromFile(path) + require.NoError(t, err) + require.Equal(t, multicodec.CarIndexSorted, sortedIndex.Codec()) + + f, err := os.Open(path) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + br, err := carv2.NewBlockReader(f) + require.NoError(t, err) + + subject := generateMultihashSortedIndex(t, path) + for { + wantNext, err := br.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + + dmh, err := multihash.Decode(wantNext.Cid().Hash()) + require.NoError(t, err) + if dmh.Code == multihash.IDENTITY { + continue + } + + wantCid := wantNext.Cid() + var wantOffsets []uint64 + err = sortedIndex.GetAll(wantCid, func(o uint64) bool { + wantOffsets = append(wantOffsets, o) + return false + }) + require.NoError(t, err) + + var gotOffsets []uint64 + err = subject.GetAll(wantCid, func(o uint64) bool { + gotOffsets = append(gotOffsets, o) + return false + }) + + require.NoError(t, err) + require.Equal(t, wantOffsets, gotOffsets) + } +} + +func generateMultihashSortedIndex(t *testing.T, path string) index.Index { + f, err := os.Open(path) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + reader := internalio.ToByteReadSeeker(f) + header, err := carv1.ReadHeader(reader) + require.NoError(t, err) + require.Equal(t, uint64(1), header.Version) + + idx, err := index.New(multicodec.CarMultihashIndexSorted) + require.NoError(t, err) + records := make([]index.Record, 0) + + var sectionOffset int64 + sectionOffset, err = reader.Seek(0, io.SeekCurrent) + require.NoError(t, err) + + for { + sectionLen, err := varint.ReadUvarint(reader) + if err == io.EOF { + break + } + require.NoError(t, err) + + if sectionLen == 0 { + break + } + + cidLen, c, err := cid.CidFromReader(reader) + require.NoError(t, err) + records = append(records, index.Record{Cid: c, Offset: uint64(sectionOffset)}) + remainingSectionLen := int64(sectionLen) - int64(cidLen) + sectionOffset, err = reader.Seek(remainingSectionLen, io.SeekCurrent) + require.NoError(t, err) + } + + err = idx.Load(records) + require.NoError(t, err) + + return idx +}