From cca0a48d627f9b534eebe4566bbb5f465b56db09 Mon Sep 17 00:00:00 2001 From: racytech Date: Tue, 25 Jul 2023 03:49:17 +0600 Subject: [PATCH 1/3] MatchCmp and MatchPrefixCmp added --- compress/compress_test.go | 101 ++++++- compress/decompress.go | 216 +++++++++++++- compress/decompress_bench_test.go | 40 ++- compress/decompress_fuzz_test.go | 96 ++++++ compress/decompress_test.go | 479 +++++++++++++++++++++++++++++- state/aggregator_test.go | 2 +- state/btree_index.go | 4 +- state/domain.go | 2 +- state/history.go | 6 +- state/merge.go | 4 +- 10 files changed, 931 insertions(+), 19 deletions(-) create mode 100644 compress/decompress_fuzz_test.go diff --git a/compress/compress_test.go b/compress/compress_test.go index 7d54c8e56..e78d76323 100644 --- a/compress/compress_test.go +++ b/compress/compress_test.go @@ -123,6 +123,10 @@ func TestCompressDict1(t *testing.T) { require.False(t, g.MatchPrefix([]byte("long"))) require.True(t, g.MatchPrefix([]byte(""))) require.True(t, g.MatchPrefix([]byte{})) + + require.Equal(t, 1, g.MatchPrefixCmp([]byte("long"))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte(""))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte{})) word, _ := g.Next(nil) require.Nil(t, word) @@ -132,6 +136,12 @@ func TestCompressDict1(t *testing.T) { require.False(t, g.MatchPrefix([]byte("wordnotmatch"))) require.False(t, g.MatchPrefix([]byte("longnotmatch"))) require.True(t, g.MatchPrefix([]byte{})) + + require.Equal(t, 0, g.MatchPrefixCmp([]byte("long"))) + require.Equal(t, 1, g.MatchPrefixCmp([]byte("longlong"))) + require.Equal(t, 1, g.MatchPrefixCmp([]byte("wordnotmatch"))) + require.Equal(t, 1, g.MatchPrefixCmp([]byte("longnotmatch"))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte{})) _, _ = g.Next(nil) // next word is `word` @@ -142,6 +152,14 @@ func TestCompressDict1(t *testing.T) { require.True(t, g.MatchPrefix(nil)) require.False(t, g.MatchPrefix([]byte("wordnotmatch"))) require.False(t, g.MatchPrefix([]byte("longnotmatch"))) + + require.Equal(t, -1, g.MatchPrefixCmp([]byte("long"))) + require.Equal(t, -1, g.MatchPrefixCmp([]byte("longlong"))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte("word"))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte(""))) + require.Equal(t, 0, g.MatchPrefixCmp(nil)) + require.Equal(t, 1, g.MatchPrefixCmp([]byte("wordnotmatch"))) + require.Equal(t, -1, g.MatchPrefixCmp([]byte("longnotmatch"))) _, _ = g.Next(nil) // next word is `longlongword %d` @@ -154,8 +172,89 @@ func TestCompressDict1(t *testing.T) { require.False(t, g.MatchPrefix([]byte("wordnotmatch"))) require.False(t, g.MatchPrefix([]byte("longnotmatch"))) require.True(t, g.MatchPrefix([]byte{})) - word, _ = g.Next(nil) + + require.Equal(t, 0, g.MatchPrefixCmp([]byte(fmt.Sprintf("%d", i)))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte(expectPrefix))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte(expectPrefix+"long"))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte(expectPrefix+"longword "))) + require.Equal(t, 1, g.MatchPrefixCmp([]byte("wordnotmatch"))) + require.Equal(t, 1, g.MatchPrefixCmp([]byte("longnotmatch"))) + require.Equal(t, 0, g.MatchPrefixCmp([]byte{})) + savePos := g.dataP + word, nextPos := g.Next(nil) + expected := fmt.Sprintf("%d longlongword %d", i, i) + g.Reset(savePos) + require.Equal(t, 0, g.MatchCmp([]byte(expected))) + g.Reset(nextPos) + if string(word) != expected { + t.Errorf("expected %s, got (hex) [%s]", expected, word) + } + i++ + } + + if cs := checksum(d.filePath); cs != 3153486123 { + // it's ok if hash changed, but need re-generate all existing snapshot hashes + // in https://github.com/ledgerwatch/erigon-snapshot + t.Errorf("result file hash changed, %d", cs) + } +} + +func TestCompressDictCmp(t *testing.T) { + d := prepareDict(t) + defer d.Close() + g := d.MakeGetter() + i := 0 + g.Reset(0) + for g.HasNext() { + // next word is `nil` + savePos := g.dataP + require.Equal(t, 1, g.MatchCmp([]byte("long"))) + require.Equal(t, 0, g.MatchCmp([]byte(""))) // moves offset + g.Reset(savePos) + require.Equal(t, 0, g.MatchCmp([]byte{})) // moves offset + g.Reset(savePos) + + word, _ := g.Next(nil) + require.Nil(t, word) + + // next word is `long` + savePos = g.dataP + require.Equal(t, 0, g.MatchCmp([]byte("long"))) // moves offset + g.Reset(savePos) + require.Equal(t, 1, g.MatchCmp([]byte("longlong"))) + require.Equal(t, 1, g.MatchCmp([]byte("wordnotmatch"))) + require.Equal(t, 1, g.MatchCmp([]byte("longnotmatch"))) + require.Equal(t, -1, g.MatchCmp([]byte{})) + _, _ = g.Next(nil) + + // next word is `word` + savePos = g.dataP + require.Equal(t, -1, g.MatchCmp([]byte("long"))) + require.Equal(t, -1, g.MatchCmp([]byte("longlong"))) + require.Equal(t, 0, g.MatchCmp([]byte("word"))) // moves offset + g.Reset(savePos) + require.Equal(t, -1, g.MatchCmp([]byte(""))) + require.Equal(t, -1, g.MatchCmp(nil)) + require.Equal(t, 1, g.MatchCmp([]byte("wordnotmatch"))) + require.Equal(t, -1, g.MatchCmp([]byte("longnotmatch"))) + _, _ = g.Next(nil) + + // next word is `longlongword %d` + expectPrefix := fmt.Sprintf("%d long", i) + + require.Equal(t, -1, g.MatchCmp([]byte(fmt.Sprintf("%d", i)))) + require.Equal(t, -1, g.MatchCmp([]byte(expectPrefix))) + require.Equal(t, -1, g.MatchCmp([]byte(expectPrefix+"long"))) + require.Equal(t, -1, g.MatchCmp([]byte(expectPrefix+"longword "))) + require.Equal(t, 1, g.MatchCmp([]byte("wordnotmatch"))) + require.Equal(t, 1, g.MatchCmp([]byte("longnotmatch"))) + require.Equal(t, -1, g.MatchCmp([]byte{})) + savePos = g.dataP + word, nextPos := g.Next(nil) expected := fmt.Sprintf("%d longlongword %d", i, i) + g.Reset(savePos) + require.Equal(t, 0, g.MatchCmp([]byte(expected))) + g.Reset(nextPos) if string(word) != expected { t.Errorf("expected %s, got (hex) [%s]", expected, word) } diff --git a/compress/decompress.go b/compress/decompress.go index 7e0c5b481..f03e61f1a 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -613,8 +613,8 @@ func (g *Getter) NextUncompressed() ([]byte, uint64) { return g.data[pos:g.dataP], g.dataP } -// Skip moves offset to the next word and returns the new offset. -func (g *Getter) Skip() uint64 { +// Skip moves offset to the next word and returns the new offset and the length of the word. +func (g *Getter) Skip() (uint64, int) { l := g.nextPos(true) l-- // because when create huffman tree we do ++ , because 0 is terminator if l == 0 { @@ -622,7 +622,7 @@ func (g *Getter) Skip() uint64 { g.dataP++ g.dataBit = 0 } - return g.dataP + return g.dataP, 0 } wordLen := int(l) @@ -648,10 +648,10 @@ func (g *Getter) Skip() uint64 { } // Uncovered characters g.dataP += add - return g.dataP + return g.dataP, wordLen } -func (g *Getter) SkipUncompressed() uint64 { +func (g *Getter) SkipUncompressed() (uint64, int) { wordLen := g.nextPos(true) wordLen-- // because when create huffman tree we do ++ , because 0 is terminator if wordLen == 0 { @@ -659,7 +659,7 @@ func (g *Getter) SkipUncompressed() uint64 { g.dataP++ g.dataBit = 0 } - return g.dataP + return g.dataP, 0 } g.nextPos(false) if g.dataBit > 0 { @@ -667,7 +667,7 @@ func (g *Getter) SkipUncompressed() uint64 { g.dataBit = 0 } g.dataP += wordLen - return g.dataP + return g.dataP, int(wordLen) } // Match returns true and next offset if the word at current offset fully matches the buf @@ -817,3 +817,205 @@ func (g *Getter) MatchPrefix(prefix []byte) bool { } return true } + +// Match returns true and next offset if the word at current offset fully matches the buf +// returns false and current offset otherwise. +func (g *Getter) MatchCmp(buf []byte) int { + savePos := g.dataP + wordLen := g.nextPos(true) + wordLen-- // because when create huffman tree we do ++ , because 0 is terminator + lenBuf := len(buf) + if wordLen == 0 && lenBuf != 0 { + g.dataP, g.dataBit = savePos, 0 + return 1 + } + if wordLen == 0 && lenBuf == 0 { + if g.dataBit > 0 { + g.dataP++ + g.dataBit = 0 + } + return 0 + } + // if wordLen == 0 { + // if g.dataBit > 0 { + // g.dataP++ + // g.dataBit = 0 + // } + // if lenBuf != 0 { + // g.dataP, g.dataBit = savePos, 0 + // return 1 + // } else { + // return 0 + // } + // } + + decoded := make([]byte, wordLen) + var bufPos int + // In the first pass, we only check patterns + for pos := g.nextPos(false /* clean */); pos != 0; pos = g.nextPos(false) { + bufPos += int(pos) - 1 + pattern := g.nextPattern() + copy(decoded[bufPos:], pattern) + } + if g.dataBit > 0 { + g.dataP++ + g.dataBit = 0 + } + postLoopPos := g.dataP + g.dataP, g.dataBit = savePos, 0 + g.nextPos(true /* clean */) // Reset the state of huffman decoder + // Second pass - we check spaces not covered by the patterns + var lastUncovered int + bufPos = 0 + for pos := g.nextPos(false /* clean */); pos != 0; pos = g.nextPos(false) { + bufPos += int(pos) - 1 + // fmt.Printf("BUF POS: %d, POS: %d, lastUncovered: %d\n", bufPos, pos, lastUncovered) + if bufPos > lastUncovered { + dif := uint64(bufPos - lastUncovered) + copy(decoded[lastUncovered:bufPos], g.data[postLoopPos:postLoopPos+dif]) + postLoopPos += dif + } + lastUncovered = bufPos + len(g.nextPattern()) + } + + if int(wordLen) > lastUncovered { + dif := wordLen - uint64(lastUncovered) + copy(decoded[lastUncovered:wordLen], g.data[postLoopPos:postLoopPos+dif]) + postLoopPos += dif + } + cmp := bytes.Compare(buf, decoded) + if cmp == 0 { + g.dataP, g.dataBit = postLoopPos, 0 + } else { + g.dataP, g.dataBit = savePos, 0 + } + return cmp +} + +// MatchPrefix only checks if the word at the current offset has a buf prefix. Does not move offset to the next word. +func (g *Getter) MatchPrefixCmp(prefix []byte) int { + savePos := g.dataP + defer func() { + g.dataP, g.dataBit = savePos, 0 + }() + + wordLen := g.nextPos(true /* clean */) + wordLen-- // because when create huffman tree we do ++ , because 0 is terminator + prefixLen := len(prefix) + if wordLen == 0 && prefixLen != 0 { + return 1 + } + if prefixLen == 0 { + return 0 + } + + decoded := make([]byte, wordLen) + var bufPos int + // In the first pass, we only check patterns + // Only run this loop as far as the prefix goes, there is no need to check further + for pos := g.nextPos(false /* clean */); pos != 0; pos = g.nextPos(false) { + bufPos += int(pos) - 1 + if bufPos > prefixLen { + break + } + pattern := g.nextPattern() + copy(decoded[bufPos:], pattern) + } + + if g.dataBit > 0 { + g.dataP++ + g.dataBit = 0 + } + postLoopPos := g.dataP + g.dataP, g.dataBit = savePos, 0 + g.nextPos(true /* clean */) // Reset the state of huffman decoder + // Second pass - we check spaces not covered by the patterns + var lastUncovered int + bufPos = 0 + for pos := g.nextPos(false /* clean */); pos != 0 && lastUncovered < prefixLen; pos = g.nextPos(false) { + bufPos += int(pos) - 1 + if bufPos > lastUncovered { + dif := uint64(bufPos - lastUncovered) + copy(decoded[lastUncovered:bufPos], g.data[postLoopPos:postLoopPos+dif]) + postLoopPos += dif + } + lastUncovered = bufPos + len(g.nextPattern()) + } + if prefixLen > lastUncovered && int(wordLen) > lastUncovered { + dif := wordLen - uint64(lastUncovered) + copy(decoded[lastUncovered:wordLen], g.data[postLoopPos:postLoopPos+dif]) + postLoopPos += dif + } + var cmp int + if prefixLen > int(wordLen) { + cmp = bytes.Compare(prefix, decoded) + } else { + cmp = bytes.Compare(prefix, decoded[:prefixLen]) + } + + return cmp +} + +// FastNext extracts a compressed word from current offset in the file +// into the given buf, returning a new byte slice which contains extracted word. +// It is important to allocate enough buf size. Could throw an error if word in file is larger then the buf size. +// After extracting next word, it moves to the beginning of the next one +func (g *Getter) FastNext(buf []byte) ([]byte, uint64) { + defer func() { + if rec := recover(); rec != nil { + panic(fmt.Sprintf("file: %s, %s, %s", g.fName, rec, dbg.Stack())) + } + }() + + savePos := g.dataP + wordLen := g.nextPos(true) + wordLen-- // because when create huffman tree we do ++ , because 0 is terminator + // decoded := make([]byte, wordLen) + if wordLen == 0 { + if g.dataBit > 0 { + g.dataP++ + g.dataBit = 0 + } + return buf[:wordLen], g.dataP + } + bufPos := 0 // Tracking position in buf where to insert part of the word + lastUncovered := 0 + + // if int(wordLen) > cap(buf) { + // newBuf := make([]byte, int(wordLen)) + // buf = newBuf + // } + // Loop below fills in the patterns + for pos := g.nextPos(false /* clean */); pos != 0; pos = g.nextPos(false) { + bufPos += int(pos) - 1 // Positions where to insert patterns are encoded relative to one another + pt := g.nextPattern() + copy(buf[bufPos:], pt) + } + if g.dataBit > 0 { + g.dataP++ + g.dataBit = 0 + } + postLoopPos := g.dataP + g.dataP = savePos + g.dataBit = 0 + g.nextPos(true /* clean */) // Reset the state of huffman reader + bufPos = lastUncovered // Restore to the beginning of buf + // Loop below fills the data which is not in the patterns + for pos := g.nextPos(false); pos != 0; pos = g.nextPos(false) { + bufPos += int(pos) - 1 // Positions where to insert patterns are encoded relative to one another + if bufPos > lastUncovered { + dif := uint64(bufPos - lastUncovered) + copy(buf[lastUncovered:bufPos], g.data[postLoopPos:postLoopPos+dif]) + postLoopPos += dif + } + lastUncovered = bufPos + len(g.nextPattern()) + } + if int(wordLen) > lastUncovered { + dif := wordLen - uint64(lastUncovered) + copy(buf[lastUncovered:wordLen], g.data[postLoopPos:postLoopPos+dif]) + postLoopPos += dif + } + g.dataP = postLoopPos + g.dataBit = 0 + return buf[:wordLen], postLoopPos +} diff --git a/compress/decompress_bench_test.go b/compress/decompress_bench_test.go index 950fd2e11..9f6cd4b5d 100644 --- a/compress/decompress_bench_test.go +++ b/compress/decompress_bench_test.go @@ -37,6 +37,20 @@ func BenchmarkDecompressNext(b *testing.B) { } } +func BenchmarkDecompressFastNext(b *testing.B) { + t := new(testing.T) + d := prepareDict(t) + defer d.Close() + g := d.MakeGetter() + buf := make([]byte, 100) + for i := 0; i < b.N; i++ { + _, _ = g.FastNext(buf) + if !g.HasNext() { + g.Reset(0) + } + } +} + func BenchmarkDecompressSkip(b *testing.B) { t := new(testing.T) d := prepareDict(t) @@ -44,7 +58,7 @@ func BenchmarkDecompressSkip(b *testing.B) { g := d.MakeGetter() for i := 0; i < b.N; i++ { - _ = g.Skip() + _, _ = g.Skip() if !g.HasNext() { g.Reset(0) } @@ -61,6 +75,19 @@ func BenchmarkDecompressMatch(b *testing.B) { } } +func BenchmarkDecompressMatchCmp(b *testing.B) { + t := new(testing.T) + d := prepareDict(t) + defer d.Close() + g := d.MakeGetter() + for i := 0; i < b.N; i++ { + _ = g.MatchCmp([]byte("longlongword")) + if !g.HasNext() { + g.Reset(0) + } + } +} + func BenchmarkDecompressMatchPrefix(b *testing.B) { t := new(testing.T) d := prepareDict(t) @@ -72,6 +99,17 @@ func BenchmarkDecompressMatchPrefix(b *testing.B) { } } +func BenchmarkDecompressMatchPrefixCmp(b *testing.B) { + t := new(testing.T) + d := prepareDict(t) + defer d.Close() + g := d.MakeGetter() + + for i := 0; i < b.N; i++ { + _ = g.MatchPrefixCmp([]byte("longlongword")) + } +} + func BenchmarkDecompressTorrent(t *testing.B) { t.Skip() diff --git a/compress/decompress_fuzz_test.go b/compress/decompress_fuzz_test.go new file mode 100644 index 000000000..e127a6240 --- /dev/null +++ b/compress/decompress_fuzz_test.go @@ -0,0 +1,96 @@ +package compress + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "path/filepath" + "testing" + + "github.com/ledgerwatch/erigon-lib/common/cmp" + "github.com/ledgerwatch/log/v3" +) + +func FuzzDecompressMatch(f *testing.F) { + logger := log.New() + f.Fuzz(func(t *testing.T, x []byte, pos []byte, workers int8) { + t.Helper() + t.Parallel() + if len(pos) < 1 || workers < 1 { + t.Skip() + return + } + var a [][]byte + j := 0 + for i := 0; i < len(pos) && j < len(x); i++ { + if pos[i] == 0 { + continue + } + next := cmp.Min(j+int(pos[i]*10), len(x)-1) + bbb := x[j:next] + a = append(a, bbb) + j = next + } + + ctx := context.Background() + tmpDir := t.TempDir() + file := filepath.Join(tmpDir, fmt.Sprintf("compressed-%d", rand.Int31())) + c, err := NewCompressor(ctx, t.Name(), file, tmpDir, 2, int(workers), log.LvlDebug, logger) + if err != nil { + t.Fatal(err) + } + c.DisableFsync() + defer c.Close() + for _, b := range a { + if err = c.AddWord(b); err != nil { + t.Fatal(err) + } + } + if err = c.Compress(); err != nil { + t.Fatal(err) + } + c.Close() + d, err := NewDecompressor(file) + if err != nil { + t.Fatal(err) + } + defer d.Close() + g := d.MakeGetter() + buf := make([]byte, (1 << 16)) + word_idx := 0 + for g.HasNext() { + expected := a[word_idx] + savePos := g.dataP + cmp := g.MatchCmp(expected) + pos1 := g.dataP + if cmp != 0 { + t.Fatalf("MatchCmp: expected match: %v\n", expected) + } + g.Reset(savePos) + ok, _ := g.Match(expected) + pos2 := g.dataP + if !ok { + t.Fatalf("MatchBool: expected match: %v\n", expected) + } + g.Reset(savePos) + word, nexPos := g.Next(nil) + if bytes.Compare(word, expected) != 0 { + t.Fatalf("bytes.Compare: expected match: %v with word %v\n", expected, word) + } + if pos1 != pos2 && pos2 != nexPos { + t.Fatalf("pos1 %v != pos2 %v != nexPos %v\n", pos1, pos2, nexPos) + } + g.Reset(savePos) + word2, nexPos2 := g.FastNext(buf) + if bytes.Compare(word2, expected) != 0 { + t.Fatalf("bytes.Compare: expected match: %v with word %v\n", expected, word) + } + if pos1 != pos2 && pos2 != nexPos && nexPos != nexPos2 { + t.Fatalf("pos1 %v != pos2 %v != nexPos %v\n", pos1, pos2, nexPos) + } + word_idx++ + } + }) + +} diff --git a/compress/decompress_test.go b/compress/decompress_test.go index f3d777453..0becd5bb5 100644 --- a/compress/decompress_test.go +++ b/compress/decompress_test.go @@ -20,10 +20,12 @@ import ( "bytes" "context" "fmt" + "math/rand" "os" "path/filepath" "strings" "testing" + "time" "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/require" @@ -99,6 +101,30 @@ func TestDecompressMatchOK(t *testing.T) { } } +func TestDecompressMatchCmpOK(t *testing.T) { + d := prepareLoremDict(t) + defer d.Close() + g := d.MakeGetter() + i := 0 + for g.HasNext() { + w := loremStrings[i] + if i%2 != 0 { + expected := fmt.Sprintf("%s %d", w, i) + result := g.MatchCmp([]byte(expected)) + if result != 0 { + t.Errorf("expexted match with %s", expected) + } + } else { + word, _ := g.Next(nil) + expected := fmt.Sprintf("%s %d", w, i) + if string(word) != expected { + t.Errorf("expected %s, got (hex) %s", expected, word) + } + } + i++ + } +} + func prepareStupidDict(t *testing.T, size int) *Decompressor { t.Helper() logger := log.New() @@ -160,7 +186,6 @@ func TestDecompressMatchNotOK(t *testing.T) { for g.HasNext() { w := loremStrings[i] expected := fmt.Sprintf("%s %d", w, i+1) - ok, _ := g.Match([]byte(expected)) if ok { t.Errorf("not expexted match with %s", expected) @@ -214,6 +239,47 @@ func TestDecompressMatchPrefix(t *testing.T) { } } +func TestDecompressMatchPrefixCmp(t *testing.T) { + d := prepareLoremDict(t) + defer d.Close() + g := d.MakeGetter() + i := 0 + skipCount := 0 + for g.HasNext() { + w := loremStrings[i] + expected := []byte(fmt.Sprintf("%s %d", w, i+1)) + expected = expected[:len(expected)/2] + cmp := g.MatchPrefixCmp(expected) + if cmp != 0 { + t.Errorf("expexted match with %s", expected) + } + g.Skip() + skipCount++ + i++ + } + if skipCount != i { + t.Errorf("something wrong with match logic") + } + g.Reset(0) + skipCount = 0 + i = 0 + for g.HasNext() { + w := loremStrings[i] + expected := []byte(fmt.Sprintf("%s %d", w, i+1)) + expected = expected[:len(expected)/2] + if len(expected) > 0 { + expected[len(expected)-1]++ + cmp := g.MatchPrefixCmp(expected) + if cmp == 0 { + t.Errorf("not expexted match with %s", expected) + } + } + g.Skip() + skipCount++ + i++ + } +} + func prepareLoremDictUncompressed(t *testing.T) *Decompressor { t.Helper() logger := log.New() @@ -288,3 +354,414 @@ func TestDecompressTorrent(t *testing.T) { require.NotZero(t, sz) } } + +const N = 100 + +var WORDS = [N][]byte{} +var WORD_FLAGS = [N]bool{} // false - uncompressed word, true - compressed word +var INPUT_FLAGS = []int{} // []byte or nil input + +func randWord() []byte { + size := rand.Intn(256) // size of the word + word := make([]byte, size) + for i := 0; i < size; i++ { + word[i] = byte(rand.Intn(256)) + } + return word +} + +func generateRandWords() { + for i := 0; i < N-2; i++ { + WORDS[i] = randWord() + } + // make sure we have at least 2 emtpy []byte + WORDS[N-2] = []byte{} + WORDS[N-1] = []byte{} +} + +func randIntInRange(min, max int) int { + return (rand.Intn(max-min) + min) +} + +func clearPrevDict() { + WORDS = [N][]byte{} + WORD_FLAGS = [N]bool{} + INPUT_FLAGS = []int{} +} + +func prepareRandomDict(t *testing.T) *Decompressor { + t.Helper() + logger := log.New() + tmpDir := t.TempDir() + file := filepath.Join(tmpDir, "complex") + t.Name() + c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug, logger) + if err != nil { + t.Fatal(err) + } + // c.DisableFsync() + defer c.Close() + clearPrevDict() + rand.Seed(time.Now().UnixNano()) + generateRandWords() + + idx := 0 + for idx < N { + n := rand.Intn(2) + switch n { + case 0: // input case + word := WORDS[idx] + m := rand.Intn(2) + if m == 1 { + if err = c.AddWord(word); err != nil { + t.Fatal(err) + } + WORD_FLAGS[idx] = true + } else { + if err = c.AddUncompressedWord(word); err != nil { + t.Fatal(err) + } + } + idx++ + INPUT_FLAGS = append(INPUT_FLAGS, n) + case 1: // nil word + if err = c.AddWord(nil); err != nil { + t.Fatal(err) + } + INPUT_FLAGS = append(INPUT_FLAGS, n) + default: + t.Fatal(fmt.Errorf("case %d\n", n)) + } + } + + if err = c.Compress(); err != nil { + t.Fatal(err) + } + var d *Decompressor + if d, err = NewDecompressor(file); err != nil { + t.Fatal(err) + } + return d +} + +func TestDecompressRandomMatchCmp(t *testing.T) { + d := prepareRandomDict(t) + defer d.Close() + + if d.wordsCount != uint64(len(INPUT_FLAGS)) { + t.Fatalf("TestDecompressRandomDict: d.wordsCount != len(INPUT_FLAGS)") + } + + g := d.MakeGetter() + + word_idx := 0 + input_idx := 0 + total := 0 + // check for existing and non existing keys + for g.HasNext() { + pos := g.dataP + if INPUT_FLAGS[input_idx] == 0 { // []byte input + notExpected := string(WORDS[word_idx]) + "z" + cmp := g.MatchCmp([]byte(notExpected)) + if cmp == 0 { + t.Fatalf("not expected match: %v\n got: %v\n", []byte(notExpected), WORDS[word_idx]) + } + + expected := WORDS[word_idx] + cmp = g.MatchCmp(expected) // move offset to the next pos + if cmp != 0 { + savePos := g.dataP + g.Reset(pos) + word, nextPos := g.Next(nil) + if nextPos != savePos { + t.Fatalf("nextPos %d != savePos %d\n", nextPos, savePos) + } + if bytes.Compare(expected, word) != cmp { + fmt.Printf("1 expected: %v, acutal %v, cmp %d\n", expected, word, cmp) + } + t.Fatalf("expected match: %v\n got: %v\n", expected, word) + } + word_idx++ + } else { // nil input + notExpected := []byte{0} + cmp := g.MatchCmp(notExpected) + if cmp == 0 { + t.Fatal("not expected match []byte{0} with nil\n") + } + + expected := []byte{} + cmp = g.MatchCmp(nil) + if cmp != 0 { + savePos := g.dataP + g.Reset(pos) + word, nextPos := g.Next(nil) + if nextPos != savePos { + t.Fatalf("nextPos %d != savePos %d\n", nextPos, savePos) + } + if bytes.Compare(expected, word) != cmp { + fmt.Printf("2 expected: %v, acutal %v, cmp %d\n", expected, word, cmp) + } + t.Fatalf("expected match: %v\n got: %v\n", expected, word) + } + } + input_idx++ + total++ + } + if total != int(d.wordsCount) { + t.Fatalf("expected word count: %d, got %d\n", int(d.wordsCount), total) + } +} + +func TestDecompressRandomMatchBool(t *testing.T) { + d := prepareRandomDict(t) + defer d.Close() + + if d.wordsCount != uint64(len(INPUT_FLAGS)) { + t.Fatalf("TestDecompressRandomDict: d.wordsCount != len(INPUT_FLAGS)") + } + + g := d.MakeGetter() + + word_idx := 0 + input_idx := 0 + total := 0 + // check for existing and non existing keys + for g.HasNext() { + pos := g.dataP + if INPUT_FLAGS[input_idx] == 0 { // []byte input + notExpected := string(WORDS[word_idx]) + "z" + ok, _ := g.Match([]byte(notExpected)) + if ok { + t.Fatalf("not expected match: %v\n got: %v\n", []byte(notExpected), WORDS[word_idx]) + } + + expected := WORDS[word_idx] + ok, _ = g.Match(expected) + if !ok { + g.Reset(pos) + word, _ := g.Next(nil) + if bytes.Compare(expected, word) != 0 { + fmt.Printf("1 expected: %v, acutal %v, ok %v\n", expected, word, ok) + } + t.Fatalf("expected match: %v\n got: %v\n", expected, word) + } + word_idx++ + } else { // nil input + notExpected := []byte{0} + ok, _ := g.Match(notExpected) + if ok { + t.Fatal("not expected match []byte{0} with nil\n") + } + + expected := []byte{} + ok, _ = g.Match(nil) + if !ok { + g.Reset(pos) + word, _ := g.Next(nil) + if bytes.Compare(expected, word) != 0 { + fmt.Printf("2 expected: %v, acutal %v, ok %v\n", expected, word, ok) + } + t.Fatalf("expected match: %v\n got: %v\n", expected, word) + } + } + input_idx++ + total++ + } + if total != int(d.wordsCount) { + t.Fatalf("expected word count: %d, got %d\n", int(d.wordsCount), total) + } +} + +func TestDecompressRandomFastNext(t *testing.T) { + d := prepareRandomDict(t) + defer d.Close() + + if d.wordsCount != uint64(len(INPUT_FLAGS)) { + t.Fatalf("TestDecompressRandomDict: d.wordsCount != len(INPUT_FLAGS)") + } + + g := d.MakeGetter() + + word_idx := 0 + input_idx := 0 + total := 0 + buf := make([]byte, (1 << 23)) + // check for existing and non existing keys + for g.HasNext() { + if INPUT_FLAGS[input_idx] == 0 { // []byte input + expected := WORDS[word_idx] + word, _ := g.FastNext(buf) + if bytes.Compare(expected, word) != 0 { + t.Fatalf("1 expected: %v, got %v\n", expected, word) + } + word_idx++ + } else { // nil input + expected := []byte{} + word, _ := g.FastNext(buf) + if bytes.Compare(expected, word) != 0 { + t.Fatalf("2 expected: %v, got %v\n", expected, word) + } + } + input_idx++ + total++ + } + if total != int(d.wordsCount) { + t.Fatalf("expected word count: %d, got %d\n", int(d.wordsCount), total) + } +} + +// func TestDecompressRandomDict(t *testing.T) { +// d := prepareRandomDict(t) +// defer d.Close() + +// if d.wordsCount != uint64(len(INPUT_FLAGS)) { +// t.Fatalf("TestDecompressRandomDict: d.wordsCount != len(INPUT_FLAGS)") +// } + +// g := d.MakeGetter() + +// word_idx := 0 +// input_idx := 0 +// total := 0 +// // check for existing and non existing keys +// for g.HasNext() { +// pos := g.dataP +// if INPUT_FLAGS[input_idx] == 0 { // []byte input +// notExpected := string(WORDS[word_idx]) + "z" +// ok, _ := g.Match([]byte(notExpected)) +// if ok { +// t.Fatalf("not expected match: %s\n got: %s\n", notExpected, WORDS[word_idx]) +// } + +// expected := WORDS[word_idx] +// ok, _ = g.Match(expected) +// if !ok { +// g.Reset(pos) +// word, _ := g.Next(nil) +// t.Fatalf("expected match: %s\n got: %s\n", expected, word) +// } +// word_idx++ +// } else { // nil input +// notExpected := []byte{0} +// ok, _ := g.Match(notExpected) +// if ok { +// t.Fatal("not expected match []byte{0} with nil\n") +// } + +// expected := []byte{} +// ok, _ = g.Match(nil) +// if !ok { +// g.Reset(pos) +// word, _ := g.Next(nil) +// t.Fatalf("expected match: %s\n got: %s\n", expected, word) +// } +// } +// input_idx++ +// total++ +// } +// if total != int(d.wordsCount) { +// t.Fatalf("expected word count: %d, got %d\n", int(d.wordsCount), total) +// } + +// // TODO: check for non existing keys, suffixes, prefixes +// g.Reset(0) + +// word_idx = 0 +// input_idx = 0 +// // check for existing and non existing prefixes +// var notExpected = []byte{2, 3, 4} +// for g.HasNext() { + +// if INPUT_FLAGS[input_idx] == 0 { // []byte input +// expected := WORDS[word_idx] +// prefix_size := len(expected) / 2 +// if len(expected)/2 > 3 { +// prefix_size = randIntInRange(3, len(expected)/2) +// } +// expected = expected[:prefix_size] +// if len(expected) > 0 { +// if !g.MatchPrefix(expected) { +// t.Errorf("expected match with %s", expected) +// } +// expected[len(expected)-1]++ +// if g.MatchPrefix(expected) { +// t.Errorf("not expected match with %s", expected) +// } +// } else { +// if !g.MatchPrefix([]byte{}) { +// t.Error("expected match with empty []byte") +// } +// if g.MatchPrefix(notExpected) { +// t.Error("not expected empty []byte to match with []byte{2, 3, 4}") +// } +// } +// word_idx++ +// } else { // nil input +// if !g.MatchPrefix(nil) { +// t.Error("expected match with nil") +// } +// if g.MatchPrefix(notExpected) { +// t.Error("not expected nil to match with []byte{2, 3, 4}") +// } +// } + +// g.Skip() +// input_idx++ +// } + +// g.Reset(0) + +// word_idx = 0 +// input_idx = 0 +// // check for existing and non existing suffixes +// notExpected = []byte{2, 3, 4} +// for g.HasNext() { + +// if INPUT_FLAGS[input_idx] == 0 { // []byte input +// suffix := WORDS[word_idx] +// if len(suffix) > 1 { +// prefix := suffix[:len(suffix)/2] +// suffix = suffix[len(suffix)/2:] +// equal := reflect.DeepEqual(prefix, suffix) +// // check existing suffixes +// if g.MatchPrefix(suffix) { // suffix has to be equal to prefix +// if !equal { +// t.Fatalf("MatchPrefix(suffix) expected match: prefix is unequal to suffix %v != %v, full slice %v\n", prefix, suffix, WORDS[word_idx]) +// } +// } else { // suffix has not to be the same as prefix +// if equal { +// t.Fatalf("MatchPrefix(suffix) expected unmatch: prefix is equal to suffix %v != %v, full slice %v\n", prefix, suffix, WORDS[word_idx]) +// } +// } + +// if len(suffix) > 0 { +// suffix[0]++ +// if g.MatchPrefix(suffix) && reflect.DeepEqual(prefix, suffix) { +// t.Fatalf("MatchPrefix(suffix) not expected match: prefix is unequal to suffix %v != %v, full slice %v\n", prefix, suffix, WORDS[word_idx]) +// } +// } + +// g.Skip() +// } else { +// ok, _ := g.Match(suffix) +// if !ok { +// t.Fatal("Match(suffix): expected match suffix") +// } +// } +// word_idx++ +// } else { // nil input +// if !g.MatchPrefix(nil) { +// t.Error("MatchPrefix(suffix): expected match with nil") +// } +// if g.MatchPrefix(notExpected) { +// t.Error("MatchPrefix(suffix): not expected nil to match with []byte{2, 3, 4}") +// } +// ok, _ := g.Match(nil) +// if !ok { +// t.Errorf("Match(suffix): expected to match with nil") +// } +// } + +// input_idx++ +// } +// } diff --git a/state/aggregator_test.go b/state/aggregator_test.go index e36313f63..9fc43fb3a 100644 --- a/state/aggregator_test.go +++ b/state/aggregator_test.go @@ -606,7 +606,7 @@ func generateCompressedKV(tb testing.TB, tmp string, keySize, valueSize, keyCoun keys, _ := getter.Next(key[:0]) err = iw.AddKey(keys[:], pos) - pos = getter.Skip() + pos, _ = getter.Skip() require.NoError(tb, err) } decomp.Close() diff --git a/state/btree_index.go b/state/btree_index.go index 4a26a5eb8..80c122997 100644 --- a/state/btree_index.go +++ b/state/btree_index.go @@ -873,7 +873,7 @@ func BuildBtreeIndexWithDecompressor(indexPath string, kv *compress.Decompressor return err } - pos = getter.Skip() + pos, _ = getter.Skip() if pos-kp == 1 { ks[len(key)]++ emptys++ @@ -922,7 +922,7 @@ func BuildBtreeIndex(dataPath, indexPath string, logger log.Logger) error { return err } - pos = getter.Skip() + pos, _ = getter.Skip() } decomp.Close() diff --git a/state/domain.go b/state/domain.go index b73214856..5c8c5e311 100644 --- a/state/domain.go +++ b/state/domain.go @@ -1140,7 +1140,7 @@ func buildIndex(ctx context.Context, d *compress.Decompressor, idxPath, tmpdir s } } // Skip value - keyPos = g.Skip() + keyPos, _ = g.Skip() p.Processed.Add(1) } diff --git a/state/history.go b/state/history.go index 2da4df3d5..6cbe6c42e 100644 --- a/state/history.go +++ b/state/history.go @@ -456,9 +456,9 @@ func buildVi(ctx context.Context, historyItem, iiItem *filesItem, historyIdxPath return err } if compressVals { - valOffset = g2.Skip() + valOffset, _ = g2.Skip() } else { - valOffset = g2.SkipUncompressed() + valOffset, _ = g2.SkipUncompressed() } } @@ -937,7 +937,7 @@ func (h *History) buildFiles(ctx context.Context, step uint64, collation History if err = rs.AddKey(historyKey, valOffset); err != nil { return HistoryFiles{}, fmt.Errorf("add %s history idx [%x]: %w", h.filenameBase, historyKey, err) } - valOffset = g.Skip() + valOffset, _ = g.Skip() } } if err = rs.Build(); err != nil { diff --git a/state/merge.go b/state/merge.go index 45b06284e..1f8f22623 100644 --- a/state/merge.go +++ b/state/merge.go @@ -979,9 +979,9 @@ func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*fi return nil, nil, err } if h.compressVals { - valOffset = g2.Skip() + valOffset, _ = g2.Skip() } else { - valOffset = g2.SkipUncompressed() + valOffset, _ = g2.SkipUncompressed() } } p.Processed.Add(1) From 32cad7ddb6a36e6b55772beb911d0bd02d0c3b89 Mon Sep 17 00:00:00 2001 From: racytech Date: Tue, 25 Jul 2023 03:58:34 +0600 Subject: [PATCH 2/3] comments + MatchPrefixUncompressed --- compress/decompress.go | 49 +++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/compress/decompress.go b/compress/decompress.go index f03e61f1a..a660dd2fd 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -818,8 +818,8 @@ func (g *Getter) MatchPrefix(prefix []byte) bool { return true } -// Match returns true and next offset if the word at current offset fully matches the buf -// returns false and current offset otherwise. +// MatchCmp lexicographically compares given buf with the word at the current offset in the file. +// returns 0 if buf == word, -1 if buf < word, 1 if buf > word func (g *Getter) MatchCmp(buf []byte) int { savePos := g.dataP wordLen := g.nextPos(true) @@ -836,18 +836,6 @@ func (g *Getter) MatchCmp(buf []byte) int { } return 0 } - // if wordLen == 0 { - // if g.dataBit > 0 { - // g.dataP++ - // g.dataBit = 0 - // } - // if lenBuf != 0 { - // g.dataP, g.dataBit = savePos, 0 - // return 1 - // } else { - // return 0 - // } - // } decoded := make([]byte, wordLen) var bufPos int @@ -892,7 +880,8 @@ func (g *Getter) MatchCmp(buf []byte) int { return cmp } -// MatchPrefix only checks if the word at the current offset has a buf prefix. Does not move offset to the next word. +// MatchPrefixCmp lexicographically compares given prefix with the word at the current offset in the file. +// returns 0 if buf == word, -1 if buf < word, 1 if buf > word func (g *Getter) MatchPrefixCmp(prefix []byte) int { savePos := g.dataP defer func() { @@ -948,6 +937,9 @@ func (g *Getter) MatchPrefixCmp(prefix []byte) int { } var cmp int if prefixLen > int(wordLen) { + // TODO(racytech): handle this case + // e.g: prefix = 'aaacb' + // word = 'aaa' cmp = bytes.Compare(prefix, decoded) } else { cmp = bytes.Compare(prefix, decoded[:prefixLen]) @@ -956,6 +948,33 @@ func (g *Getter) MatchPrefixCmp(prefix []byte) int { return cmp } +func (g *Getter) MatchPrefixUncompressed(prefix []byte) int { + savePos := g.dataP + defer func() { + g.dataP, g.dataBit = savePos, 0 + }() + + wordLen := g.nextPos(true /* clean */) + wordLen-- // because when create huffman tree we do ++ , because 0 is terminator + prefixLen := len(prefix) + if wordLen == 0 && prefixLen != 0 { + return 1 + } + if prefixLen == 0 { + return 0 + } + + g.nextPos(true) + + if prefixLen > int(wordLen) { + // TODO(racytech): handle this case + // e.g: prefix = 'aaacb' + // word = 'aaa' + } + + return bytes.Compare(prefix, g.data[g.dataP:g.dataP+wordLen]) +} + // FastNext extracts a compressed word from current offset in the file // into the given buf, returning a new byte slice which contains extracted word. // It is important to allocate enough buf size. Could throw an error if word in file is larger then the buf size. From c12c4f05a45acd8bb909f165e65dc8731c666448 Mon Sep 17 00:00:00 2001 From: racytech Date: Tue, 25 Jul 2023 04:13:59 +0600 Subject: [PATCH 3/3] lint errors removed --- compress/decompress.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/compress/decompress.go b/compress/decompress.go index a660dd2fd..80aa27015 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -933,7 +933,7 @@ func (g *Getter) MatchPrefixCmp(prefix []byte) int { if prefixLen > lastUncovered && int(wordLen) > lastUncovered { dif := wordLen - uint64(lastUncovered) copy(decoded[lastUncovered:wordLen], g.data[postLoopPos:postLoopPos+dif]) - postLoopPos += dif + // postLoopPos += dif } var cmp int if prefixLen > int(wordLen) { @@ -966,11 +966,11 @@ func (g *Getter) MatchPrefixUncompressed(prefix []byte) int { g.nextPos(true) - if prefixLen > int(wordLen) { - // TODO(racytech): handle this case - // e.g: prefix = 'aaacb' - // word = 'aaa' - } + // if prefixLen > int(wordLen) { + // // TODO(racytech): handle this case + // // e.g: prefix = 'aaacb' + // // word = 'aaa' + // } return bytes.Compare(prefix, g.data[g.dataP:g.dataP+wordLen]) }