Skip to content

Commit

Permalink
Fixes ingester sample deduping (#5470)
Browse files Browse the repository at this point in the history
* Fixes ingester sample deduping

* Call get hash only once

* Update pkg/iter/entry_iterator.go

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
cyriltovena and owen-d authored Feb 25, 2022
1 parent 8f6d3d9 commit 6dad54f
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 69 deletions.
39 changes: 22 additions & 17 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,8 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
stats.AddHeadChunkLines(int64(len(hb.entries)))
streams := map[uint64]*logproto.Stream{}

streams := map[string]*logproto.Stream{}
baseHash := pipeline.BaseLabels().Hash()
process := func(e entry) {
// apply time filtering
if e.t < mint || e.t >= maxt {
Expand All @@ -1011,13 +1011,13 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return
}
var stream *logproto.Stream
lhash := parsedLbs.Hash()
if stream, ok = streams[lhash]; !ok {
labels := parsedLbs.Labels().String()
if stream, ok = streams[labels]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
Hash: lhash,
Labels: labels,
Hash: baseHash,
}
streams[lhash] = stream
streams[labels] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, e.t),
Expand Down Expand Up @@ -1051,29 +1051,34 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
}
stats := stats.FromContext(ctx)
stats.AddHeadChunkLines(int64(len(hb.entries)))
series := map[uint64]*logproto.Series{}
series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()

for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.s)
if !ok {
continue
}
var found bool
var s *logproto.Series
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
var (
found bool
s *logproto.Series
)

lbs := parsedLabels.String()
if s, found = series[lbs]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Labels: lbs,
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
StreamHash: lhash,
StreamHash: baseHash,
}
series[lhash] = s
series[lbs] = s
}
h := xxhash.Sum64(unsafeGetBytes(e.s))

s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.t,
Value: value,
Hash: h,
Hash: xxhash.Sum64(unsafeGetBytes(e.s)),
})
}

Expand Down
39 changes: 20 additions & 19 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ func (hb *unorderedHeadBlock) Iterator(
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
streams := map[uint64]*logproto.Stream{}

streams := map[string]*logproto.Stream{}
baseHash := pipeline.BaseLabels().Hash()
_ = hb.forEntries(
ctx,
direction,
Expand All @@ -233,13 +233,13 @@ func (hb *unorderedHeadBlock) Iterator(
}

var stream *logproto.Stream
lhash := parsedLbs.Hash()
if stream, ok = streams[lhash]; !ok {
labels := parsedLbs.String()
if stream, ok = streams[labels]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
Hash: lhash,
Labels: labels,
Hash: baseHash,
}
streams[lhash] = stream
streams[labels] = stream
}

stream.Entries = append(stream.Entries, logproto.Entry{
Expand Down Expand Up @@ -267,8 +267,8 @@ func (hb *unorderedHeadBlock) SampleIterator(
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator {
series := map[uint64]*logproto.Series{}

series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()
_ = hb.forEntries(
ctx,
logproto.FORWARD,
Expand All @@ -279,23 +279,24 @@ func (hb *unorderedHeadBlock) SampleIterator(
if !ok {
return nil
}
var found bool
var s *logproto.Series
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
var (
found bool
s *logproto.Series
)
lbs := parsedLabels.String()
s, found = series[lbs]
if !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Labels: lbs,
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
StreamHash: parsedLabels.Hash(),
StreamHash: baseHash,
}
series[lhash] = s
series[lbs] = s
}

h := xxhash.Sum64(unsafeGetBytes(line))
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts,
Value: value,
Hash: h,
Hash: xxhash.Sum64(unsafeGetBytes(line)),
})
return nil
},
Expand Down
Loading

0 comments on commit 6dad54f

Please sign in to comment.