Skip to content

Commit

Permalink
test: mock out ring in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 1, 2024
1 parent 07d57c3 commit 06269b0
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 121 deletions.
102 changes: 19 additions & 83 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
Expand Down Expand Up @@ -520,40 +519,46 @@ func Test_SortLabelsOnPush(t *testing.T) {
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels)
})
}

t.Run("with service_name added during ingestion", func(t *testing.T) {
func Test_TruncateLogLines(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
ingester := &mockIngester{}

limits.MaxLineSize = 5
limits.MaxLineSizeTruncate = true
return limits, &mockIngester{}
}

t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) {
limits, ingester := setup()
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

request := makeWriteRequest(10, 10)
request.Streams[0].Labels = `{buzz="f", x="y", a="b"}`
_, err := distributors[0].Push(ctx, request)
_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels)
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
})
}

func Test_TruncateLogLines(t *testing.T) {
func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.MaxLineSize = 5
limits.MaxLineSizeTruncate = true
return limits, &mockIngester{}
}

t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) {
t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) {
limits, ingester := setup()
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.NoError(t, err)
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10)))
topVal := ingester.Peek()
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
require.Nil(t, topVal)
})
}

Expand Down Expand Up @@ -833,53 +838,9 @@ func TestParseStreamLabels(t *testing.T) {
expectedErr error
generateLimits func() *validation.Limits
}{
{
name: "service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{foo="bar"}`,
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
},
},
{
name: "no labels defined - service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{}`,
expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg),
},
{
name: "service name label enabled",
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
{
Name: loghttp_push.LabelServiceName,
Value: loghttp_push.ServiceUnknown,
},
},
},
{
name: "service name label should not get counted against max labels count",
origLabels: `{foo="bar"}`,
origLabels: `{foo="bar", service_name="unknown_service"}`,
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand All @@ -897,31 +858,6 @@ func TestParseStreamLabels(t *testing.T) {
},
},
},
{
name: "use label service as service name",
origLabels: `{container="nginx", foo="bar", service="auth"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "container",
Value: "nginx",
},
{
Name: "foo",
Value: "bar",
},
{
Name: "service",
Value: "auth",
},
{
Name: loghttp_push.LabelServiceName,
Value: "auth",
},
},
},
} {
limits := tc.generateLimits()
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
Expand Down
106 changes: 69 additions & 37 deletions pkg/pattern/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -23,7 +24,21 @@ import (
)

func TestSweepInstance(t *testing.T) {
ringClient := &fakeRingClient{}
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: "localhost", Addr: "ingester0"},
{Id: "remotehost", Addr: "ingester1"},
{Id: "otherhost", Addr: "ingester2"},
},
}

fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(replicationSet, nil)

ringClient := &fakeRingClient{
ring: fakeRing,
}

ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
Expand Down Expand Up @@ -98,7 +113,9 @@ func defaultIngesterTestConfig(t testing.TB) Config {
return cfg
}

type fakeRingClient struct{}
type fakeRingClient struct {
ring ring.ReadRing
}

func (f *fakeRingClient) Pool() *ring_client.Pool {
panic("not implemented")
Expand Down Expand Up @@ -133,82 +150,97 @@ func (f *fakeRingClient) AddListener(_ services.Listener) {
}

func (f *fakeRingClient) Ring() ring.ReadRing {
return &fakeRing{}
return f.ring
}

type fakeRing struct{}
type fakeRing struct {
mock.Mock
}

// InstancesWithTokensCount returns the number of instances in the ring that have tokens.
func (f *fakeRing) InstancesWithTokensCount() int {
panic("not implemented") // TODO: Implement
args := f.Called()
return args.Int(0)
}

// InstancesInZoneCount returns the number of instances in the ring that are registered in given zone.
func (f *fakeRing) InstancesInZoneCount(_ string) int {
panic("not implemented") // TODO: Implement
func (f *fakeRing) InstancesInZoneCount(zone string) int {
args := f.Called(zone)
return args.Int(0)
}

// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.
func (f *fakeRing) InstancesWithTokensInZoneCount(_ string) int {
panic("not implemented") // TODO: Implement
func (f *fakeRing) InstancesWithTokensInZoneCount(zone string) int {
args := f.Called(zone)
return args.Int(0)
}

// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
func (f *fakeRing) ZonesCount() int {
panic("not implemented") // TODO: Implement
args := f.Called()
return args.Int(0)
}

func (f *fakeRing) Get(
_ uint32,
_ ring.Operation,
_ []ring.InstanceDesc,
_ []string,
_ []string,
key uint32,
op ring.Operation,
bufInstances []ring.InstanceDesc,
bufStrings1, bufStrings2 []string,
) (ring.ReplicationSet, error) {
panic("not implemented")
args := f.Called(key, op, bufInstances, bufStrings1, bufStrings2)
return args.Get(0).(ring.ReplicationSet), args.Error(1)
}

func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
func (f *fakeRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) {
args := f.Called(op)
return args.Get(0).(ring.ReplicationSet), args.Error(1)
}

func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
func (f *fakeRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) {
args := f.Called(op)
return args.Get(0).(ring.ReplicationSet), args.Error(1)
}

func (f *fakeRing) ReplicationFactor() int {
panic("not implemented")
args := f.Called()
return args.Int(0)
}

func (f *fakeRing) InstancesCount() int {
panic("not implemented")
args := f.Called()
return args.Int(0)
}

func (f *fakeRing) ShuffleShard(_ string, _ int) ring.ReadRing {
panic("not implemented")
func (f *fakeRing) ShuffleShard(identifier string, size int) ring.ReadRing {
args := f.Called(identifier, size)
return args.Get(0).(ring.ReadRing)
}

func (f *fakeRing) GetInstanceState(_ string) (ring.InstanceState, error) {
panic("not implemented")
func (f *fakeRing) GetInstanceState(instanceID string) (ring.InstanceState, error) {
args := f.Called(instanceID)
return args.Get(0).(ring.InstanceState), args.Error(1)
}

func (f *fakeRing) ShuffleShardWithLookback(
_ string,
_ int,
_ time.Duration,
_ time.Time,
identifier string,
size int,
lookbackPeriod time.Duration,
now time.Time,
) ring.ReadRing {
panic("not implemented")
args := f.Called(identifier, size, lookbackPeriod, now)
return args.Get(0).(ring.ReadRing)
}

func (f *fakeRing) HasInstance(_ string) bool {
panic("not implemented")
func (f *fakeRing) HasInstance(instanceID string) bool {
args := f.Called(instanceID)
return args.Bool(0)
}

func (f *fakeRing) CleanupShuffleShardCache(_ string) {
panic("not implemented")
func (f *fakeRing) CleanupShuffleShardCache(identifier string) {
f.Called(identifier)
}

func (f *fakeRing) GetTokenRangesForInstance(_ string) (ring.TokenRanges, error) {
panic("not implemented")
func (f *fakeRing) GetTokenRangesForInstance(identifier string) (ring.TokenRanges, error) {
args := f.Called(identifier)
return args.Get(0).(ring.TokenRanges), args.Error(1)
}
18 changes: 17 additions & 1 deletion pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/ring"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/pattern/iter"

Expand All @@ -21,8 +22,23 @@ import (

func TestInstancePushQuery(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
ringClient := &fakeRingClient{}

ingesterID := "foo"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
{Id: "bar", Addr: "ingester1"},
{Id: "baz", Addr: "ingester2"},
},
}

fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)

ringClient := &fakeRingClient{
ring: fakeRing,
}

mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything)
Expand Down

0 comments on commit 06269b0

Please sign in to comment.