Skip to content

Commit

Permalink
Speed up EntrySortIterator by 20%. (#5318)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This patch replace the underlying heap of the `EntrySortIterator`. Instead of using a min or max heap the entry iterators are sorted at the beginning. Once the iteration starts only the first entry iterator is re-positioned depending on its current value. This will yield the same runtime complexity as the current heap based solution. However, since the new algorithm accesses the first element it can be cached. Thus the number of access calls to `Entry().Timestamp` are halved.

```
› benchstat before.txt after.txt
name                        old time/op  new time/op  delta
SortIterator/merge_sort-16  4.40ms ± 4%  4.38ms ± 3%     ~     (p=0.400 n=9+10)
SortIterator/sort-16        3.65ms ± 2%  2.80ms ± 3%  -23.25%  (p=0.000 n=9+10)
```

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Documentation added
- [ ] Tests updated
- [ ] Add an entry in the `CHANGELOG.md` about the changes.
  • Loading branch information
jeschkies authored Feb 9, 2022
1 parent a3bacd5 commit 8e9009a
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 25 deletions.
3 changes: 2 additions & 1 deletion pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func fillChunkClose(c Chunk, close bool) int64 {
func fillChunkRandomOrder(c Chunk, close bool) {
ub := int64(1 << 30)
i := int64(0)
random := rand.New(rand.NewSource(42))
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: testdata.LogString(i),
Expand All @@ -77,7 +78,7 @@ func fillChunkRandomOrder(c Chunk, close bool) {
panic(err)
}
i++
entry.Timestamp = time.Unix(0, rand.Int63n(ub))
entry.Timestamp = time.Unix(0, random.Int63n(ub))
entry.Line = testdata.LogString(i)

}
Expand Down
89 changes: 65 additions & 24 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/heap"
"context"
"io"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -295,15 +296,11 @@ func (i *mergeEntryIterator) Len() int {
}

type entrySortIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prefetched bool

currEntry entryWithLabels
errs []error
is []EntryIterator
prefetched bool
byAscendingTime bool
currEntry entryWithLabels
errs []error
}

// NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators.
Expand All @@ -320,25 +317,48 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr
result := &entrySortIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false, byAlphabetical: true}
result.byAscendingTime = false
case logproto.FORWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true, byAlphabetical: true}
result.byAscendingTime = true
default:
panic("bad direction")
}
return result
}

// init initialize the underlaying heap
func (i *entrySortIterator) lessByIndex(k, j int) bool {
t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano()
if t1 == t2 {
return i.is[k].Labels() < i.is[j].Labels()
}
if i.byAscendingTime {
return t1 < t2
}
return t1 > t2
}

func (i *entrySortIterator) lessByValue(t1 int64, l1 string, index int) bool {
t2 := i.is[index].Entry().Timestamp.UnixNano()
if t1 == t2 {
return l1 < i.is[index].Labels()
}
if i.byAscendingTime {
return t1 < t2
}
return t1 > t2
}

// init throws out empty iterators and sorts them.
func (i *entrySortIterator) init() {
if i.prefetched {
return
}

i.prefetched = true
tmp := make([]EntryIterator, 0, len(i.is))
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
tmp = append(tmp, it)
continue
}

Expand All @@ -347,36 +367,57 @@ func (i *entrySortIterator) init() {
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
i.is = tmp
sort.Slice(i.is, i.lessByIndex)
}

// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
func (i *entrySortIterator) fix() {
head := i.is[0]
t1 := head.Entry().Timestamp.UnixNano()
l1 := head.Labels()

// shortcut
if len(i.is) <= 1 || i.lessByValue(t1, l1, 1) {
return
}

// First element is out of place. So we reposition it.
i.is = i.is[1:] // drop head
index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, in) })

if index == len(i.is) {
i.is = append(i.is, head)
} else {
i.is = append(i.is[:index+1], i.is[index:]...)
i.is[index] = head
}
}

func (i *entrySortIterator) Next() bool {
i.init()

if i.heap.Len() == 0 {
if len(i.is) == 0 {
return false
}

next := i.heap.Peek()
next := i.is[0]
i.currEntry.entry = next.Entry()
i.currEntry.labels = next.Labels()
i.currEntry.streamHash = next.StreamHash()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
i.is = i.is[1:]
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)

if len(i.is) > 1 {
i.fix()
}

return true
}

Expand Down Expand Up @@ -404,8 +445,8 @@ func (i *entrySortIterator) Error() error {
}

func (i *entrySortIterator) Close() error {
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
for _, entryIterator := range i.is {
if err := entryIterator.Close(); err != nil {
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ func BenchmarkSortIterator(b *testing.B) {
})

b.Run("sort", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
Expand Down

0 comments on commit 8e9009a

Please sign in to comment.