From 6853c387ed7b8197ac0c0559bcc2f1f8d6a52c87 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Tue, 26 Oct 2021 03:11:41 -0400 Subject: [PATCH] Loki: Apply the ingester ring config to all other rings (distributor, ruler, query-scheduler) (#4546) * apply the ingester ring config to all other rings in ApplyDynamicConfig * update the test, only verify ingester config wasn't manipulated by the step to apply the common memberlist config when join_members is defined. --- pkg/loki/config_wrapper.go | 48 +++++++++++++++++++++++++++++++++ pkg/loki/config_wrapper_test.go | 13 +++++++-- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index e3f90eb13c0a..8ec740203858 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -85,6 +85,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { r.QueryScheduler.UseSchedulerRing = true } + applyIngesterRingConfig(r) applyMemberlistConfig(r) if err := applyStorageConfig(r, &defaults); err != nil { @@ -101,6 +102,53 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } } +// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else +// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically +// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's +// and is the most central ring config for Loki. +func applyIngesterRingConfig(r *ConfigWrapper) { + lc := r.Ingester.LifecyclerConfig + rc := r.Ingester.LifecyclerConfig.RingConfig + s := rc.KVStore.Store + sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig + + // This gets ugly because we use a separate struct for configuring each ring... + + // Distributor + r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod + r.Distributor.DistributorRing.InstancePort = lc.Port + r.Distributor.DistributorRing.InstanceAddr = lc.Addr + r.Distributor.DistributorRing.InstanceID = lc.ID + r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames + r.Distributor.DistributorRing.KVStore.Store = s + r.Distributor.DistributorRing.KVStore.StoreConfig = sc + + // Ruler + r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod + r.Ruler.Ring.InstancePort = lc.Port + r.Ruler.Ring.InstanceAddr = lc.Addr + r.Ruler.Ring.InstanceID = lc.ID + r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames + r.Ruler.Ring.NumTokens = lc.NumTokens + r.Ruler.Ring.KVStore.Store = s + r.Ruler.Ring.KVStore.StoreConfig = sc + + // Query Scheduler + r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod + r.QueryScheduler.SchedulerRing.InstancePort = lc.Port + r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr + r.QueryScheduler.SchedulerRing.InstanceID = lc.ID + r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames + r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone + r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.QueryScheduler.SchedulerRing.TokensFilePath = lc.TokensFilePath + r.QueryScheduler.SchedulerRing.KVStore.Store = s + r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc +} + // applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist // if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the // memberlist configuration section, they probably want to be using memberlist for all their ring configurations. diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index bffdc857e4ae..c982a65f9b03 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -2,12 +2,15 @@ package loki import ( "flag" + "fmt" "io/ioutil" "net/url" "os" + "reflect" "testing" "time" + "github.com/cortexproject/cortex/pkg/distributor" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -118,8 +121,6 @@ common: config, defaults := testContext(emptyConfigString, nil) assert.EqualValues(t, defaults.Ingester.LifecyclerConfig.RingConfig.KVStore.Store, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) - assert.EqualValues(t, defaults.Distributor.DistributorRing.KVStore.Store, config.Distributor.DistributorRing.KVStore.Store) - assert.EqualValues(t, defaults.Ruler.Ring.KVStore.Store, config.Ruler.Ring.KVStore.Store) }) t.Run("when top-level memberlist join_members are provided, all applicable rings are defaulted to use memberlist", func(t *testing.T) { @@ -818,3 +819,11 @@ func TestDefaultUnmarshal(t *testing.T) { assert.Equal(t, 9095, config.Server.GRPCListenPort) }) } + +func Test_applyIngesterRingConfig(t *testing.T) { + + msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." + + assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + +}