diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 2216f6039cc6..e75e092a2349 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -5,10 +5,12 @@ import ( fmt "fmt" "io/ioutil" "os" + "sort" "testing" "time" "github.com/cortexproject/cortex/pkg/chunk" + cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util/services" "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" @@ -268,6 +270,35 @@ func buildStreams() []logproto.Stream { return streams } +var ( + stream1 = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "2", + }, + }, + } + stream2 = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "2"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "3", + }, + { + Timestamp: time.Unix(0, 2), + Line: "4", + }, + }, + } +) + func Test_SeriesIterator(t *testing.T) { var instances []*instance @@ -281,46 +312,8 @@ func Test_SeriesIterator(t *testing.T) { for i := 0; i < 3; i++ { inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil) - - require.NoError(t, - inst.Push(context.Background(), &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: labels.Labels{labels.Label{Name: "stream1", Value: fmt.Sprintf("%d", i)}}.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 1), - Line: "1", - }, - { - Timestamp: time.Unix(0, 2), - Line: "2", - }, - }, - }, - }, - }), - ) - require.NoError(t, - inst.Push(context.Background(), &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: labels.Labels{labels.Label{Name: "stream2", Value: fmt.Sprintf("%d", i)}}.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 3), - Line: "3", - }, - { - Timestamp: time.Unix(0, 4), - Line: "4", - }, - }, - }, - }, - }), - ) - + require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) + require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) instances = append(instances, inst) } @@ -329,43 +322,26 @@ func Test_SeriesIterator(t *testing.T) { })) for i := 0; i < 3; i++ { - iter.Next() - assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID) - assert.Equal(t, "stream1", iter.Stream().Labels[0].Name) - assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().Labels[0].Value) - - memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0) - require.NoError(t, err) - it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil)) - require.NoError(t, err) - assert.Equal(t, true, it.Next()) - assert.Equal(t, "1", it.Entry().Line) - assert.Equal(t, int64(1), it.Entry().Timestamp.UnixNano()) - assert.Equal(t, true, it.Next()) - assert.Equal(t, "2", it.Entry().Line) - assert.Equal(t, int64(2), it.Entry().Timestamp.UnixNano()) - - assert.Equal(t, false, it.Next()) - require.NoError(t, it.Error()) - - iter.Next() - assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID) - assert.Equal(t, "stream2", iter.Stream().Labels[0].Name) - assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().Labels[0].Value) - - memchunk, err = chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0) - require.NoError(t, err) - it, err = memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil)) - require.NoError(t, err) - assert.Equal(t, true, it.Next()) - assert.Equal(t, "3", it.Entry().Line) - assert.Equal(t, int64(3), it.Entry().Timestamp.UnixNano()) - assert.Equal(t, true, it.Next()) - assert.Equal(t, "4", it.Entry().Line) - assert.Equal(t, int64(4), it.Entry().Timestamp.UnixNano()) - - assert.Equal(t, false, it.Next()) - require.NoError(t, it.Error()) + var streams []logproto.Stream + for j := 0; j < 2; j++ { + iter.Next() + assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID) + memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0) + require.NoError(t, err) + it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil)) + require.NoError(t, err) + stream := logproto.Stream{ + Labels: cortex_client.FromLabelAdaptersToLabels(iter.Stream().Labels).String(), + } + for it.Next() { + stream.Entries = append(stream.Entries, it.Entry()) + } + require.NoError(t, it.Close()) + streams = append(streams, stream) + } + sort.Slice(streams, func(i, j int) bool { return streams[i].Labels < streams[j].Labels }) + require.Equal(t, stream1, streams[0]) + require.Equal(t, stream2, streams[1]) } require.False(t, iter.Next())