diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 9992fb294caa..5e0a4b807d7b 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -30,12 +30,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" + loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" @@ -520,40 +519,46 @@ func Test_SortLabelsOnPush(t *testing.T) { topVal := ingester.Peek() require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels) }) +} - t.Run("with service_name added during ingestion", func(t *testing.T) { +func Test_TruncateLogLines(t *testing.T) { + setup := func() (*validation.Limits, *mockIngester) { limits := &validation.Limits{} flagext.DefaultValues(limits) - ingester := &mockIngester{} + + limits.MaxLineSize = 5 + limits.MaxLineSizeTruncate = true + return limits, &mockIngester{} + } + + t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) { + limits, ingester := setup() distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - request := makeWriteRequest(10, 10) - request.Streams[0].Labels = `{buzz="f", x="y", a="b"}` - _, err := distributors[0].Push(ctx, request) + _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) require.NoError(t, err) topVal := ingester.Peek() - require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels) + require.Len(t, topVal.Streams[0].Entries[0].Line, 5) }) } -func Test_TruncateLogLines(t *testing.T) { +func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) { setup := func() (*validation.Limits, *mockIngester) { limits := &validation.Limits{} flagext.DefaultValues(limits) limits.MaxLineSize = 5 - limits.MaxLineSizeTruncate = true return limits, &mockIngester{} } - t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) { + t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) { limits, ingester := setup() distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) - require.NoError(t, err) + require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10))) topVal := ingester.Peek() - require.Len(t, topVal.Streams[0].Entries[0].Line, 5) + require.Nil(t, topVal) }) } @@ -833,53 +838,9 @@ func TestParseStreamLabels(t *testing.T) { expectedErr error generateLimits func() *validation.Limits }{ - { - name: "service name label mapping disabled", - generateLimits: func() *validation.Limits { - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.DiscoverServiceName = nil - return limits - }, - origLabels: `{foo="bar"}`, - expectedLabels: labels.Labels{ - { - Name: "foo", - Value: "bar", - }, - }, - }, - { - name: "no labels defined - service name label mapping disabled", - generateLimits: func() *validation.Limits { - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.DiscoverServiceName = nil - return limits - }, - origLabels: `{}`, - expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg), - }, - { - name: "service name label enabled", - origLabels: `{foo="bar"}`, - generateLimits: func() *validation.Limits { - return defaultLimit - }, - expectedLabels: labels.Labels{ - { - Name: "foo", - Value: "bar", - }, - { - Name: loghttp_push.LabelServiceName, - Value: loghttp_push.ServiceUnknown, - }, - }, - }, { name: "service name label should not get counted against max labels count", - origLabels: `{foo="bar"}`, + origLabels: `{foo="bar", service_name="unknown_service"}`, generateLimits: func() *validation.Limits { limits := &validation.Limits{} flagext.DefaultValues(limits) @@ -897,31 +858,6 @@ func TestParseStreamLabels(t *testing.T) { }, }, }, - { - name: "use label service as service name", - origLabels: `{container="nginx", foo="bar", service="auth"}`, - generateLimits: func() *validation.Limits { - return defaultLimit - }, - expectedLabels: labels.Labels{ - { - Name: "container", - Value: "nginx", - }, - { - Name: "foo", - Value: "bar", - }, - { - Name: "service", - Value: "auth", - }, - { - Name: loghttp_push.LabelServiceName, - Value: "auth", - }, - }, - }, } { limits := tc.generateLimits() distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) diff --git a/pkg/pattern/flush_test.go b/pkg/pattern/flush_test.go index 14190ecc44cc..719b65209a9d 100644 --- a/pkg/pattern/flush_test.go +++ b/pkg/pattern/flush_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" @@ -23,7 +24,21 @@ import ( ) func TestSweepInstance(t *testing.T) { - ringClient := &fakeRingClient{} + replicationSet := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Id: "localhost", Addr: "ingester0"}, + {Id: "remotehost", Addr: "ingester1"}, + {Id: "otherhost", Addr: "ingester2"}, + }, + } + + fakeRing := &fakeRing{} + fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(replicationSet, nil) + + ringClient := &fakeRingClient{ + ring: fakeRing, + } + ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger()) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck @@ -98,7 +113,9 @@ func defaultIngesterTestConfig(t testing.TB) Config { return cfg } -type fakeRingClient struct{} +type fakeRingClient struct { + ring ring.ReadRing +} func (f *fakeRingClient) Pool() *ring_client.Pool { panic("not implemented") @@ -133,82 +150,97 @@ func (f *fakeRingClient) AddListener(_ services.Listener) { } func (f *fakeRingClient) Ring() ring.ReadRing { - return &fakeRing{} + return f.ring } -type fakeRing struct{} +type fakeRing struct { + mock.Mock +} // InstancesWithTokensCount returns the number of instances in the ring that have tokens. func (f *fakeRing) InstancesWithTokensCount() int { - panic("not implemented") // TODO: Implement + args := f.Called() + return args.Int(0) } // InstancesInZoneCount returns the number of instances in the ring that are registered in given zone. -func (f *fakeRing) InstancesInZoneCount(_ string) int { - panic("not implemented") // TODO: Implement +func (f *fakeRing) InstancesInZoneCount(zone string) int { + args := f.Called(zone) + return args.Int(0) } // InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens. -func (f *fakeRing) InstancesWithTokensInZoneCount(_ string) int { - panic("not implemented") // TODO: Implement +func (f *fakeRing) InstancesWithTokensInZoneCount(zone string) int { + args := f.Called(zone) + return args.Int(0) } // ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring. func (f *fakeRing) ZonesCount() int { - panic("not implemented") // TODO: Implement + args := f.Called() + return args.Int(0) } func (f *fakeRing) Get( - _ uint32, - _ ring.Operation, - _ []ring.InstanceDesc, - _ []string, - _ []string, + key uint32, + op ring.Operation, + bufInstances []ring.InstanceDesc, + bufStrings1, bufStrings2 []string, ) (ring.ReplicationSet, error) { - panic("not implemented") + args := f.Called(key, op, bufInstances, bufStrings1, bufStrings2) + return args.Get(0).(ring.ReplicationSet), args.Error(1) } -func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { - return ring.ReplicationSet{}, nil +func (f *fakeRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + args := f.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) } -func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) { - return ring.ReplicationSet{}, nil +func (f *fakeRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + args := f.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) } func (f *fakeRing) ReplicationFactor() int { - panic("not implemented") + args := f.Called() + return args.Int(0) } func (f *fakeRing) InstancesCount() int { - panic("not implemented") + args := f.Called() + return args.Int(0) } -func (f *fakeRing) ShuffleShard(_ string, _ int) ring.ReadRing { - panic("not implemented") +func (f *fakeRing) ShuffleShard(identifier string, size int) ring.ReadRing { + args := f.Called(identifier, size) + return args.Get(0).(ring.ReadRing) } -func (f *fakeRing) GetInstanceState(_ string) (ring.InstanceState, error) { - panic("not implemented") +func (f *fakeRing) GetInstanceState(instanceID string) (ring.InstanceState, error) { + args := f.Called(instanceID) + return args.Get(0).(ring.InstanceState), args.Error(1) } func (f *fakeRing) ShuffleShardWithLookback( - _ string, - _ int, - _ time.Duration, - _ time.Time, + identifier string, + size int, + lookbackPeriod time.Duration, + now time.Time, ) ring.ReadRing { - panic("not implemented") + args := f.Called(identifier, size, lookbackPeriod, now) + return args.Get(0).(ring.ReadRing) } -func (f *fakeRing) HasInstance(_ string) bool { - panic("not implemented") +func (f *fakeRing) HasInstance(instanceID string) bool { + args := f.Called(instanceID) + return args.Bool(0) } -func (f *fakeRing) CleanupShuffleShardCache(_ string) { - panic("not implemented") +func (f *fakeRing) CleanupShuffleShardCache(identifier string) { + f.Called(identifier) } -func (f *fakeRing) GetTokenRangesForInstance(_ string) (ring.TokenRanges, error) { - panic("not implemented") +func (f *fakeRing) GetTokenRangesForInstance(identifier string) (ring.TokenRanges, error) { + args := f.Called(identifier) + return args.Get(0).(ring.TokenRanges), args.Error(1) } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 6d24ab3b4367..79279708e227 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/grafana/dskit/ring" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/pattern/iter" @@ -21,8 +22,23 @@ import ( func TestInstancePushQuery(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - ringClient := &fakeRingClient{} + ingesterID := "foo" + replicationSet := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Id: ingesterID, Addr: "ingester0"}, + {Id: "bar", Addr: "ingester1"}, + {Id: "baz", Addr: "ingester2"}, + }, + } + + fakeRing := &fakeRing{} + fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(replicationSet, nil) + + ringClient := &fakeRingClient{ + ring: fakeRing, + } mockWriter := &mockEntryWriter{} mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything)