Skip to content

Commit

Permalink
Fixes test ordering flakyness.
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Jan 20, 2021
1 parent 4686806 commit 38170c4
Showing 1 changed file with 53 additions and 77 deletions.
130 changes: 53 additions & 77 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
fmt "fmt"
"io/ioutil"
"os"
"sort"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -268,6 +270,35 @@ func buildStreams() []logproto.Stream {
return streams
}

var (
stream1 = logproto.Stream{
Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "1",
},
{
Timestamp: time.Unix(0, 2),
Line: "2",
},
},
}
stream2 = logproto.Stream{
Labels: labels.Labels{labels.Label{Name: "stream", Value: "2"}}.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "3",
},
{
Timestamp: time.Unix(0, 2),
Line: "4",
},
},
}
)

func Test_SeriesIterator(t *testing.T) {
var instances []*instance

Expand All @@ -281,46 +312,8 @@ func Test_SeriesIterator(t *testing.T) {

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil)

require.NoError(t,
inst.Push(context.Background(), &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: labels.Labels{labels.Label{Name: "stream1", Value: fmt.Sprintf("%d", i)}}.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "1",
},
{
Timestamp: time.Unix(0, 2),
Line: "2",
},
},
},
},
}),
)
require.NoError(t,
inst.Push(context.Background(), &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: labels.Labels{labels.Label{Name: "stream2", Value: fmt.Sprintf("%d", i)}}.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 3),
Line: "3",
},
{
Timestamp: time.Unix(0, 4),
Line: "4",
},
},
},
},
}),
)

require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
}

Expand All @@ -329,43 +322,26 @@ func Test_SeriesIterator(t *testing.T) {
}))

for i := 0; i < 3; i++ {
iter.Next()
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID)
assert.Equal(t, "stream1", iter.Stream().Labels[0].Name)
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().Labels[0].Value)

memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0)
require.NoError(t, err)
it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil))
require.NoError(t, err)
assert.Equal(t, true, it.Next())
assert.Equal(t, "1", it.Entry().Line)
assert.Equal(t, int64(1), it.Entry().Timestamp.UnixNano())
assert.Equal(t, true, it.Next())
assert.Equal(t, "2", it.Entry().Line)
assert.Equal(t, int64(2), it.Entry().Timestamp.UnixNano())

assert.Equal(t, false, it.Next())
require.NoError(t, it.Error())

iter.Next()
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID)
assert.Equal(t, "stream2", iter.Stream().Labels[0].Name)
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().Labels[0].Value)

memchunk, err = chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0)
require.NoError(t, err)
it, err = memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil))
require.NoError(t, err)
assert.Equal(t, true, it.Next())
assert.Equal(t, "3", it.Entry().Line)
assert.Equal(t, int64(3), it.Entry().Timestamp.UnixNano())
assert.Equal(t, true, it.Next())
assert.Equal(t, "4", it.Entry().Line)
assert.Equal(t, int64(4), it.Entry().Timestamp.UnixNano())

assert.Equal(t, false, it.Next())
require.NoError(t, it.Error())
var streams []logproto.Stream
for j := 0; j < 2; j++ {
iter.Next()
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID)
memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0)
require.NoError(t, err)
it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil))
require.NoError(t, err)
stream := logproto.Stream{
Labels: cortex_client.FromLabelAdaptersToLabels(iter.Stream().Labels).String(),
}
for it.Next() {
stream.Entries = append(stream.Entries, it.Entry())
}
require.NoError(t, it.Close())
streams = append(streams, stream)
}
sort.Slice(streams, func(i, j int) bool { return streams[i].Labels < streams[j].Labels })
require.Equal(t, stream1, streams[0])
require.Equal(t, stream2, streams[1])
}

require.False(t, iter.Next())
Expand Down

0 comments on commit 38170c4

Please sign in to comment.