diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 7a9fdebeb099..548840032703 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -998,8 +998,8 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, // but the tradeoff is that queries to near-realtime data would be much lower than // cutting of blocks. stats.AddHeadChunkLines(int64(len(hb.entries))) - streams := map[uint64]*logproto.Stream{} - + streams := map[string]*logproto.Stream{} + baseHash := pipeline.BaseLabels().Hash() process := func(e entry) { // apply time filtering if e.t < mint || e.t >= maxt { @@ -1011,13 +1011,13 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, return } var stream *logproto.Stream - lhash := parsedLbs.Hash() - if stream, ok = streams[lhash]; !ok { + labels := parsedLbs.Labels().String() + if stream, ok = streams[labels]; !ok { stream = &logproto.Stream{ - Labels: parsedLbs.String(), - Hash: lhash, + Labels: labels, + Hash: baseHash, } - streams[lhash] = stream + streams[labels] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Unix(0, e.t), @@ -1051,29 +1051,34 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra } stats := stats.FromContext(ctx) stats.AddHeadChunkLines(int64(len(hb.entries))) - series := map[uint64]*logproto.Series{} + series := map[string]*logproto.Series{} + baseHash := extractor.BaseLabels().Hash() + for _, e := range hb.entries { stats.AddHeadChunkBytes(int64(len(e.s))) value, parsedLabels, ok := extractor.ProcessString(e.s) if !ok { continue } - var found bool - var s *logproto.Series - lhash := parsedLabels.Hash() - if s, found = series[lhash]; !found { + var ( + found bool + s *logproto.Series + ) + + lbs := parsedLabels.String() + if s, found = series[lbs]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), + Labels: lbs, Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0], - StreamHash: lhash, + StreamHash: baseHash, } - series[lhash] = s + series[lbs] = s } - h := xxhash.Sum64(unsafeGetBytes(e.s)) + s.Samples = append(s.Samples, logproto.Sample{ Timestamp: e.t, Value: value, - Hash: h, + Hash: xxhash.Sum64(unsafeGetBytes(e.s)), }) } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 346f85dda56b..73cabd12662f 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -219,8 +219,8 @@ func (hb *unorderedHeadBlock) Iterator( // the alternate would be that we allocate a new b.entries everytime we cut a block, // but the tradeoff is that queries to near-realtime data would be much lower than // cutting of blocks. - streams := map[uint64]*logproto.Stream{} - + streams := map[string]*logproto.Stream{} + baseHash := pipeline.BaseLabels().Hash() _ = hb.forEntries( ctx, direction, @@ -233,13 +233,13 @@ func (hb *unorderedHeadBlock) Iterator( } var stream *logproto.Stream - lhash := parsedLbs.Hash() - if stream, ok = streams[lhash]; !ok { + labels := parsedLbs.String() + if stream, ok = streams[labels]; !ok { stream = &logproto.Stream{ - Labels: parsedLbs.String(), - Hash: lhash, + Labels: labels, + Hash: baseHash, } - streams[lhash] = stream + streams[labels] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ @@ -267,8 +267,8 @@ func (hb *unorderedHeadBlock) SampleIterator( maxt int64, extractor log.StreamSampleExtractor, ) iter.SampleIterator { - series := map[uint64]*logproto.Series{} - + series := map[string]*logproto.Series{} + baseHash := extractor.BaseLabels().Hash() _ = hb.forEntries( ctx, logproto.FORWARD, @@ -279,23 +279,24 @@ func (hb *unorderedHeadBlock) SampleIterator( if !ok { return nil } - var found bool - var s *logproto.Series - lhash := parsedLabels.Hash() - if s, found = series[lhash]; !found { + var ( + found bool + s *logproto.Series + ) + lbs := parsedLabels.String() + s, found = series[lbs] + if !found { s = &logproto.Series{ - Labels: parsedLabels.String(), + Labels: lbs, Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0], - StreamHash: parsedLabels.Hash(), + StreamHash: baseHash, } - series[lhash] = s + series[lbs] = s } - - h := xxhash.Sum64(unsafeGetBytes(line)) s.Samples = append(s.Samples, logproto.Sample{ Timestamp: ts, Value: value, - Hash: h, + Hash: xxhash.Sum64(unsafeGetBytes(line)), }) return nil }, diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index e11bf2e6cbd6..346df6106298 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2,7 +2,10 @@ package ingester import ( "fmt" + "log" + "net" "net/http" + "sort" "sync" "testing" "time" @@ -13,10 +16,12 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/test/bufconn" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/client" @@ -594,3 +599,236 @@ func Test_InMemoryLabels(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{"bar", "foo"}, res.Values) } + +func Test_DedupeIngester(t *testing.T) { + var ( + requests = int64(400) + streamCount = int64(20) + streams []labels.Labels + streamHashes []uint64 + ingesterCount = 100 + + ingesterConfig = defaultIngesterTestConfig(t) + ctx, _ = user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), "foo")) + ) + // make sure we will cut blocks and chunks and use head chunks + ingesterConfig.TargetChunkSize = 800 + ingesterConfig.BlockSize = 300 + + // created many different ingesters + ingesterSet, closer := createIngesterSets(t, ingesterConfig, ingesterCount) + defer closer() + + for i := int64(0); i < streamCount; i++ { + s := labels.FromStrings("foo", "bar", "bar", fmt.Sprintf("baz%d", i)) + streams = append(streams, s) + streamHashes = append(streamHashes, s.Hash()) + } + sort.Slice(streamHashes, func(i, j int) bool { return streamHashes[i] < streamHashes[j] }) + + for i := int64(0); i < requests; i++ { + for _, ing := range ingesterSet { + _, err := ing.Push(ctx, buildPushRequest(i, streams)) + require.NoError(t, err) + } + } + + t.Run("backward log", func(t *testing.T) { + iterators := make([]iter.EntryIterator, 0, len(ingesterSet)) + for _, client := range ingesterSet { + stream, err := client.Query(ctx, &logproto.QueryRequest{ + Selector: `{foo="bar"} | label_format bar=""`, // making it difficult to dedupe by removing uncommon label. + Start: time.Unix(0, 0), + End: time.Unix(0, requests+1), + Limit: uint32(requests * streamCount), + Direction: logproto.BACKWARD, + }) + require.NoError(t, err) + iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.BACKWARD)) + } + it := iter.NewMergeEntryIterator(ctx, iterators, logproto.BACKWARD) + + for i := requests - 1; i >= 0; i-- { + actualHashes := []uint64{} + for j := 0; j < int(streamCount); j++ { + require.True(t, it.Next()) + require.Equal(t, fmt.Sprintf("line %d", i), it.Entry().Line) + require.Equal(t, i, it.Entry().Timestamp.UnixNano()) + require.Equal(t, `{bar="", foo="bar"}`, it.Labels()) + actualHashes = append(actualHashes, it.StreamHash()) + } + sort.Slice(actualHashes, func(i, j int) bool { return actualHashes[i] < actualHashes[j] }) + require.Equal(t, streamHashes, actualHashes) + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + }) + t.Run("forward log", func(t *testing.T) { + iterators := make([]iter.EntryIterator, 0, len(ingesterSet)) + for _, client := range ingesterSet { + stream, err := client.Query(ctx, &logproto.QueryRequest{ + Selector: `{foo="bar"} | label_format bar=""`, // making it difficult to dedupe by removing uncommon label. + Start: time.Unix(0, 0), + End: time.Unix(0, requests+1), + Limit: uint32(requests * streamCount), + Direction: logproto.FORWARD, + }) + require.NoError(t, err) + iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.FORWARD)) + } + it := iter.NewMergeEntryIterator(ctx, iterators, logproto.FORWARD) + + for i := int64(0); i < requests; i++ { + actualHashes := []uint64{} + for j := 0; j < int(streamCount); j++ { + require.True(t, it.Next()) + require.Equal(t, fmt.Sprintf("line %d", i), it.Entry().Line) + require.Equal(t, i, it.Entry().Timestamp.UnixNano()) + require.Equal(t, `{bar="", foo="bar"}`, it.Labels()) + actualHashes = append(actualHashes, it.StreamHash()) + } + sort.Slice(actualHashes, func(i, j int) bool { return actualHashes[i] < actualHashes[j] }) + require.Equal(t, streamHashes, actualHashes) + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + }) + t.Run("sum by metrics", func(t *testing.T) { + iterators := make([]iter.SampleIterator, 0, len(ingesterSet)) + for _, client := range ingesterSet { + stream, err := client.QuerySample(ctx, &logproto.SampleQueryRequest{ + Selector: `sum(rate({foo="bar"}[1m])) by (bar)`, + Start: time.Unix(0, 0), + End: time.Unix(0, requests+1), + }) + require.NoError(t, err) + iterators = append(iterators, iter.NewSampleQueryClientIterator(stream)) + } + it := iter.NewMergeSampleIterator(ctx, iterators) + var expectedLabels []string + for _, s := range streams { + expectedLabels = append(expectedLabels, s.WithoutLabels("foo").String()) + } + sort.Strings(expectedLabels) + for i := int64(0); i < requests; i++ { + labels := []string{} + actualHashes := []uint64{} + for j := 0; j < int(streamCount); j++ { + require.True(t, it.Next()) + require.Equal(t, float64(1), it.Sample().Value) + require.Equal(t, i, it.Sample().Timestamp) + labels = append(labels, it.Labels()) + actualHashes = append(actualHashes, it.StreamHash()) + } + sort.Strings(labels) + sort.Slice(actualHashes, func(i, j int) bool { return actualHashes[i] < actualHashes[j] }) + require.Equal(t, expectedLabels, labels) + require.Equal(t, streamHashes, actualHashes) + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + }) + t.Run("sum metrics", func(t *testing.T) { + iterators := make([]iter.SampleIterator, 0, len(ingesterSet)) + for _, client := range ingesterSet { + stream, err := client.QuerySample(ctx, &logproto.SampleQueryRequest{ + Selector: `sum(rate({foo="bar"}[1m]))`, + Start: time.Unix(0, 0), + End: time.Unix(0, requests+1), + }) + require.NoError(t, err) + iterators = append(iterators, iter.NewSampleQueryClientIterator(stream)) + } + it := iter.NewMergeSampleIterator(ctx, iterators) + for i := int64(0); i < requests; i++ { + actualHashes := []uint64{} + for j := 0; j < int(streamCount); j++ { + require.True(t, it.Next()) + require.Equal(t, float64(1), it.Sample().Value) + require.Equal(t, i, it.Sample().Timestamp) + require.Equal(t, "{}", it.Labels()) + actualHashes = append(actualHashes, it.StreamHash()) + } + sort.Slice(actualHashes, func(i, j int) bool { return actualHashes[i] < actualHashes[j] }) + require.Equal(t, streamHashes, actualHashes) + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + }) +} + +type ingesterClient struct { + logproto.PusherClient + logproto.QuerierClient +} + +func createIngesterSets(t *testing.T, config Config, count int) ([]ingesterClient, func()) { + result := make([]ingesterClient, count) + closers := make([]func(), count) + for i := 0; i < count; i++ { + ingester, closer := createIngesterServer(t, config) + result[i] = ingester + closers[i] = closer + } + return result, func() { + for _, closer := range closers { + closer() + } + } +} + +func createIngesterServer(t *testing.T, ingesterConfig Config) (ingesterClient, func()) { + t.Helper() + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil) + require.NoError(t, err) + + listener := bufconn.Listen(1024 * 1024) + + server := grpc.NewServer(grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler) + }), grpc.ChainUnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return middleware.ServerUserHeaderInterceptor(ctx, req, info, handler) + })) + + logproto.RegisterPusherServer(server, ing) + logproto.RegisterQuerierServer(server, ing) + go func() { + if err := server.Serve(listener); err != nil { + log.Fatal(err) + } + }() + conn, err := grpc.DialContext(context.Background(), "", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return listener.Dial() + })) + require.NoError(t, err) + + return ingesterClient{ + PusherClient: logproto.NewPusherClient(conn), + QuerierClient: logproto.NewQuerierClient(conn), + }, func() { + _ = services.StopAndAwaitTerminated(context.Background(), ing) + server.Stop() + _ = listener.Close() + } +} + +func buildPushRequest(ts int64, streams []labels.Labels) *logproto.PushRequest { + req := &logproto.PushRequest{} + + for _, stream := range streams { + req.Streams = append(req.Streams, logproto.Stream{ + Labels: stream.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, ts), + Line: fmt.Sprintf("line %d", ts), + }, + }, + }) + } + + return req +} diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 25c160920ea3..ca7b8429f662 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -75,16 +75,12 @@ func (h *iteratorHeap) Pop() interface{} { type iteratorSortHeap struct { iteratorHeap - byAlphabetical bool byAscendingTime bool } func (h iteratorSortHeap) Less(i, j int) bool { t1, t2 := h.iteratorHeap[i].Entry().Timestamp.UnixNano(), h.iteratorHeap[j].Entry().Timestamp.UnixNano() if t1 == t2 { - if h.byAlphabetical { - return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() - } return h.iteratorHeap[i].StreamHash() < h.iteratorHeap[j].StreamHash() } if h.byAscendingTime { @@ -329,7 +325,13 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr func (i *entrySortIterator) lessByIndex(k, j int) bool { t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano() if t1 == t2 { - return i.is[k].Labels() < i.is[j].Labels() + // The underlying stream hash may not be available, such as when merging LokiResponses in the + // frontend which were sharded. Prefer to use the underlying stream hash when available, + // which is needed in deduping code, but defer to label sorting when it's not present. + if i.is[k].StreamHash() == 0 { + return i.is[k].Labels() < i.is[j].Labels() + } + return i.is[k].StreamHash() < i.is[j].StreamHash() } if i.byAscendingTime { return t1 < t2 @@ -337,10 +339,13 @@ func (i *entrySortIterator) lessByIndex(k, j int) bool { return t1 > t2 } -func (i *entrySortIterator) lessByValue(t1 int64, l1 string, index int) bool { +func (i *entrySortIterator) lessByValue(t1 int64, l1 uint64, lb string, index int) bool { t2 := i.is[index].Entry().Timestamp.UnixNano() if t1 == t2 { - return l1 < i.is[index].Labels() + if l1 == 0 { + return lb < i.is[index].Labels() + } + return l1 < i.is[index].StreamHash() } if i.byAscendingTime { return t1 < t2 @@ -374,16 +379,17 @@ func (i *entrySortIterator) init() { func (i *entrySortIterator) fix() { head := i.is[0] t1 := head.Entry().Timestamp.UnixNano() - l1 := head.Labels() + l1 := head.StreamHash() + lb := head.Labels() // shortcut - if len(i.is) <= 1 || i.lessByValue(t1, l1, 1) { + if len(i.is) <= 1 || i.lessByValue(t1, l1, lb, 1) { return } // First element is out of place. So we reposition it. i.is = i.is[1:] // drop head - index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, in) }) + index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, lb, in) }) if index == len(i.is) { i.is = append(i.is, head) @@ -792,26 +798,37 @@ func (i *reverseEntryIterator) Close() error { // ReadBatch reads a set of entries off an iterator. func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) { - streams := map[string]*logproto.Stream{} - respSize := uint32(0) + var ( + streams = map[uint64]map[string]*logproto.Stream{} + respSize uint32 + streamsCount int + ) for ; respSize < size && i.Next(); respSize++ { labels, hash, entry := i.Labels(), i.StreamHash(), i.Entry() - stream, ok := streams[labels] + mutatedStreams, ok := streams[hash] if !ok { - stream = &logproto.Stream{ + mutatedStreams = map[string]*logproto.Stream{} + streams[hash] = mutatedStreams + } + mutatedStream, ok := mutatedStreams[labels] + if !ok { + streamsCount++ + mutatedStream = &logproto.Stream{ Labels: labels, Hash: hash, } - streams[labels] = stream + mutatedStreams[labels] = mutatedStream } - stream.Entries = append(stream.Entries, entry) + mutatedStream.Entries = append(mutatedStream.Entries, entry) } result := logproto.QueryResponse{ - Streams: make([]logproto.Stream, 0, len(streams)), + Streams: make([]logproto.Stream, 0, streamsCount), } - for _, stream := range streams { - result.Streams = append(result.Streams, *stream) + for _, mutatedStreams := range streams { + for _, s := range mutatedStreams { + result.Streams = append(result.Streams, *s) + } } return &result, respSize, i.Error() } diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index f21ef5cd21b1..1004336ec88c 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -118,8 +118,7 @@ func (it *peekingSampleIterator) Error() error { } type sampleIteratorHeap struct { - its []SampleIterator - byAlphabetical bool + its []SampleIterator } func (h sampleIteratorHeap) Len() int { return len(h.its) } @@ -139,7 +138,7 @@ func (h *sampleIteratorHeap) Pop() interface{} { func (h sampleIteratorHeap) Less(i, j int) bool { s1, s2 := h.its[i].Sample(), h.its[j].Sample() if s1.Timestamp == s2.Timestamp { - if h.byAlphabetical { + if h.its[i].StreamHash() == 0 { return h.its[i].Labels() < h.its[j].Labels() } return h.its[i].StreamHash() < h.its[j].StreamHash() @@ -286,7 +285,7 @@ func (i *mergeSampleIterator) Labels() string { } func (i *mergeSampleIterator) StreamHash() uint64 { - return i.curr.Hash + return i.curr.streamHash } func (i *mergeSampleIterator) Error() error { @@ -332,8 +331,7 @@ func NewSortSampleIterator(is []SampleIterator) SampleIterator { return is[0] } h := sampleIteratorHeap{ - its: make([]SampleIterator, 0, len(is)), - byAlphabetical: true, + its: make([]SampleIterator, 0, len(is)), } return &sortSampleIterator{ is: is, @@ -673,26 +671,37 @@ func (i *timeRangedSampleIterator) Next() bool { // ReadBatch reads a set of entries off an iterator. func ReadSampleBatch(i SampleIterator, size uint32) (*logproto.SampleQueryResponse, uint32, error) { - series := map[string]*logproto.Series{} - respSize := uint32(0) + var ( + series = map[uint64]map[string]*logproto.Series{} + respSize uint32 + seriesCount int + ) for ; respSize < size && i.Next(); respSize++ { labels, hash, sample := i.Labels(), i.StreamHash(), i.Sample() - s, ok := series[labels] + streams, ok := series[hash] if !ok { + streams = map[string]*logproto.Series{} + series[hash] = streams + } + s, ok := streams[labels] + if !ok { + seriesCount++ s = &logproto.Series{ Labels: labels, StreamHash: hash, } - series[labels] = s + streams[labels] = s } s.Samples = append(s.Samples, sample) } result := logproto.SampleQueryResponse{ - Series: make([]logproto.Series, 0, len(series)), + Series: make([]logproto.Series, 0, seriesCount), } - for _, s := range series { - result.Series = append(result.Series, *s) + for _, streams := range series { + for _, s := range streams { + result.Series = append(result.Series, *s) + } } return &result, respSize, i.Error() } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index fe943888f3d3..ca5dd366d6fb 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -104,7 +104,7 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea var found bool s, found = resByStream[out.String()] if !found { - s = &logproto.Stream{Labels: out.String(), Hash: sp.BaseLabels().Hash()} + s = &logproto.Stream{Labels: out.String()} resByStream[out.String()] = s } s.Entries = append(s.Entries, logproto.Entry{