From 8e9009a7e3c245dba913b44d204d60cc9b2642da Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 9 Feb 2022 10:29:59 +0100 Subject: [PATCH] Speed up `EntrySortIterator` by 20%. (#5318) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **What this PR does / why we need it**: This patch replace the underlying heap of the `EntrySortIterator`. Instead of using a min or max heap the entry iterators are sorted at the beginning. Once the iteration starts only the first entry iterator is re-positioned depending on its current value. This will yield the same runtime complexity as the current heap based solution. However, since the new algorithm accesses the first element it can be cached. Thus the number of access calls to `Entry().Timestamp` are halved. ``` › benchstat before.txt after.txt name old time/op new time/op delta SortIterator/merge_sort-16 4.40ms ± 4% 4.38ms ± 3% ~ (p=0.400 n=9+10) SortIterator/sort-16 3.65ms ± 2% 2.80ms ± 3% -23.25% (p=0.000 n=9+10) ``` **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [ ] Documentation added - [ ] Tests updated - [ ] Add an entry in the `CHANGELOG.md` about the changes. --- pkg/chunkenc/util_test.go | 3 +- pkg/iter/entry_iterator.go | 89 ++++++++++++++++++++++++--------- pkg/iter/entry_iterator_test.go | 1 + 3 files changed, 68 insertions(+), 25 deletions(-) diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index c4e3de043dc1..a65bdbcae105 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -66,6 +66,7 @@ func fillChunkClose(c Chunk, close bool) int64 { func fillChunkRandomOrder(c Chunk, close bool) { ub := int64(1 << 30) i := int64(0) + random := rand.New(rand.NewSource(42)) entry := &logproto.Entry{ Timestamp: time.Unix(0, 0), Line: testdata.LogString(i), @@ -77,7 +78,7 @@ func fillChunkRandomOrder(c Chunk, close bool) { panic(err) } i++ - entry.Timestamp = time.Unix(0, rand.Int63n(ub)) + entry.Timestamp = time.Unix(0, random.Int63n(ub)) entry.Line = testdata.LogString(i) } diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 988e0051218b..25c160920ea3 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "io" + "sort" "sync" "time" @@ -295,15 +296,11 @@ func (i *mergeEntryIterator) Len() int { } type entrySortIterator struct { - heap interface { - heap.Interface - Peek() EntryIterator - } - is []EntryIterator - prefetched bool - - currEntry entryWithLabels - errs []error + is []EntryIterator + prefetched bool + byAscendingTime bool + currEntry entryWithLabels + errs []error } // NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators. @@ -320,25 +317,48 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr result := &entrySortIterator{is: is} switch direction { case logproto.BACKWARD: - result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false, byAlphabetical: true} + result.byAscendingTime = false case logproto.FORWARD: - result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true, byAlphabetical: true} + result.byAscendingTime = true default: panic("bad direction") } return result } -// init initialize the underlaying heap +func (i *entrySortIterator) lessByIndex(k, j int) bool { + t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano() + if t1 == t2 { + return i.is[k].Labels() < i.is[j].Labels() + } + if i.byAscendingTime { + return t1 < t2 + } + return t1 > t2 +} + +func (i *entrySortIterator) lessByValue(t1 int64, l1 string, index int) bool { + t2 := i.is[index].Entry().Timestamp.UnixNano() + if t1 == t2 { + return l1 < i.is[index].Labels() + } + if i.byAscendingTime { + return t1 < t2 + } + return t1 > t2 +} + +// init throws out empty iterators and sorts them. func (i *entrySortIterator) init() { if i.prefetched { return } i.prefetched = true + tmp := make([]EntryIterator, 0, len(i.is)) for _, it := range i.is { if it.Next() { - i.heap.Push(it) + tmp = append(tmp, it) continue } @@ -347,36 +367,57 @@ func (i *entrySortIterator) init() { } util.LogError("closing iterator", it.Close) } - heap.Init(i.heap) + i.is = tmp + sort.Slice(i.is, i.lessByIndex) +} - // We can now clear the list of input iterators to merge, given they have all - // been processed and the non empty ones have been pushed to the heap - i.is = nil +func (i *entrySortIterator) fix() { + head := i.is[0] + t1 := head.Entry().Timestamp.UnixNano() + l1 := head.Labels() + + // shortcut + if len(i.is) <= 1 || i.lessByValue(t1, l1, 1) { + return + } + + // First element is out of place. So we reposition it. + i.is = i.is[1:] // drop head + index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, in) }) + + if index == len(i.is) { + i.is = append(i.is, head) + } else { + i.is = append(i.is[:index+1], i.is[index:]...) + i.is[index] = head + } } func (i *entrySortIterator) Next() bool { i.init() - if i.heap.Len() == 0 { + if len(i.is) == 0 { return false } - next := i.heap.Peek() + next := i.is[0] i.currEntry.entry = next.Entry() i.currEntry.labels = next.Labels() i.currEntry.streamHash = next.StreamHash() // if the top iterator is empty, we remove it. if !next.Next() { - heap.Pop(i.heap) + i.is = i.is[1:] if err := next.Error(); err != nil { i.errs = append(i.errs, err) } util.LogError("closing iterator", next.Close) return true } - if i.heap.Len() > 1 { - heap.Fix(i.heap, 0) + + if len(i.is) > 1 { + i.fix() } + return true } @@ -404,8 +445,8 @@ func (i *entrySortIterator) Error() error { } func (i *entrySortIterator) Close() error { - for i.heap.Len() > 0 { - if err := i.heap.Pop().(EntryIterator).Close(); err != nil { + for _, entryIterator := range i.is { + if err := entryIterator.Close(); err != nil { return err } } diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index 3db309288763..7777ef674fe6 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -735,6 +735,7 @@ func BenchmarkSortIterator(b *testing.B) { }) b.Run("sort", func(b *testing.B) { + b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer()