diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go new file mode 100644 index 000000000000..b0ea3f0d8ba2 --- /dev/null +++ b/pkg/ingester/transfer_test.go @@ -0,0 +1,210 @@ +package ingester + +import ( + "fmt" + "io" + "io/ioutil" + "sort" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logproto" + "golang.org/x/net/context" +) + +func TestTransferOut(t *testing.T) { + f := newTestIngesterFactory(t) + + ing := f.getIngester(time.Duration(0)) + + // Push some data into our original ingester + ctx := user.InjectOrgID(context.Background(), "test") + _, err := ing.Push(ctx, &logproto.PushRequest{ + Streams: []*logproto.Stream{ + { + Entries: []logproto.Entry{ + {Line: "line 0", Timestamp: time.Unix(0, 0)}, + {Line: "line 1", Timestamp: time.Unix(1, 0)}, + }, + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Entries: []logproto.Entry{ + {Line: "line 2", Timestamp: time.Unix(2, 0)}, + {Line: "line 3", Timestamp: time.Unix(3, 0)}, + }, + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + }) + require.NoError(t, err) + + assert.Len(t, ing.instances, 1) + if assert.Contains(t, ing.instances, "test") { + assert.Len(t, ing.instances["test"].streams, 2) + } + + // Create a new ingester and trasfer data to it + ing2 := f.getIngester(time.Second * 60) + ing.Shutdown() + + assert.Len(t, ing2.instances, 1) + if assert.Contains(t, ing2.instances, "test") { + assert.Len(t, ing2.instances["test"].streams, 2) + + lines := []string{} + + // Get all the lines back and make sure the blocks transferred successfully + for _, stream := range ing2.instances["test"].streams { + it, err := stream.Iterator( + time.Unix(0, 0), + time.Unix(10, 0), + logproto.FORWARD, + func([]byte) bool { return true }, + ) + if !assert.NoError(t, err) { + continue + } + + for it.Next() { + entry := it.Entry() + lines = append(lines, entry.Line) + } + } + + sort.Strings(lines) + + assert.Equal( + t, + []string{"line 0", "line 1", "line 2", "line 3"}, + lines, + ) + } +} + +type testIngesterFactory struct { + t *testing.T + store ring.KVClient + n int + ingesters map[string]*Ingester +} + +func newTestIngesterFactory(t *testing.T) *testIngesterFactory { + return &testIngesterFactory{ + t: t, + store: ring.NewInMemoryKVClient(ring.ProtoCodec{Factory: ring.ProtoDescFactory}), + ingesters: make(map[string]*Ingester), + } +} + +func (f *testIngesterFactory) getIngester(joinAfter time.Duration) *Ingester { + f.n++ + + cfg := defaultIngesterTestConfig() + cfg.MaxTransferRetries = 1 + cfg.LifecyclerConfig.ClaimOnRollout = true + cfg.LifecyclerConfig.ID = fmt.Sprintf("localhost-%d", f.n) + cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store + cfg.LifecyclerConfig.JoinAfter = joinAfter + cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID + + cfg.ingesterClientFactory = func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) { + ingester, ok := f.ingesters[addr] + if !ok { + return nil, fmt.Errorf("no ingester %s", addr) + } + + return struct { + logproto.PusherClient + logproto.QuerierClient + logproto.IngesterClient + grpc_health_v1.HealthClient + io.Closer + }{ + PusherClient: nil, + QuerierClient: nil, + IngesterClient: &testIngesterClient{t: f.t, i: ingester}, + Closer: ioutil.NopCloser(nil), + }, nil + } + + _, ing := newTestStore(f.t, cfg) + f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing + + // NB there's some kind of race condition with the in-memory KV client when + // we don't give the ingester a little bit of time to initialize. a 100ms + // wait time seems effective. + time.Sleep(time.Millisecond * 100) + return ing +} + +type testIngesterClient struct { + t *testing.T + i *Ingester +} + +func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) { + chunkCh := make(chan *logproto.TimeSeriesChunk) + respCh := make(chan *logproto.TransferChunksResponse) + + client := &testTransferChunksClient{ch: chunkCh, resp: respCh} + go func() { + server := &testTransferChunksServer{ch: chunkCh, resp: respCh} + err := c.i.TransferChunks(server) + require.NoError(c.t, err) + }() + + return client, nil +} + +type testTransferChunksClient struct { + ch chan *logproto.TimeSeriesChunk + resp chan *logproto.TransferChunksResponse + + grpc.ClientStream +} + +func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error { + c.ch <- chunk + return nil +} + +func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) { + close(c.ch) + resp := <-c.resp + close(c.resp) + return resp, nil +} + +type testTransferChunksServer struct { + ch chan *logproto.TimeSeriesChunk + resp chan *logproto.TransferChunksResponse + + grpc.ServerStream +} + +func (s *testTransferChunksServer) Context() context.Context { + return context.Background() +} + +func (s *testTransferChunksServer) SendAndClose(resp *logproto.TransferChunksResponse) error { + s.resp <- resp + return nil +} + +func (s *testTransferChunksServer) Recv() (*logproto.TimeSeriesChunk, error) { + chunk, ok := <-s.ch + if !ok { + return nil, io.EOF + } + return chunk, nil +}