diff --git a/CHANGELOG.md b/CHANGELOG.md index 7701b20d7..d8bab82e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,3 +39,4 @@ * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 +* [BUGFIX] Allow in-memory kv-client to support multiple codec #132 diff --git a/kv/client.go b/kv/client.go index 7131975ee..42bf55954 100644 --- a/kv/client.go +++ b/kv/client.go @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels { // The NewInMemoryKVClient returned by NewClient() is a singleton, so // that distributors and ingesters started in the same process can // find themselves. -var inmemoryStoreInit sync.Once -var inmemoryStore Client +var ( + inmemoryStoreInit sync.Once + inmemoryStore *consul.Client +) // StoreConfig is a configuration used for building single store client, either // Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep @@ -146,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co inmemoryStoreInit.Do(func() { inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) - client = inmemoryStore + // however we swap the codec so that we can encode different type of values. + client = inmemoryStore.WithCodec(codec) case "memberlist": kv, err := cfg.MemberlistKV() diff --git a/kv/client_test.go b/kv/client_test.go index e6bc21762..606090c72 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -96,7 +97,6 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { require.Equal(t, "primary", actual["multi"]) require.Equal(t, "primary", actual["inmemory"]) require.Equal(t, "secondary", actual["mock"]) - } func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string { @@ -124,6 +124,7 @@ func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogr } return result } + func newConfigsForTest() (cfg StoreConfig, c codec.Codec) { cfg = StoreConfig{ Multi: MultiConfig{ @@ -153,8 +154,7 @@ func (m *mockMessage) ProtoMessage() { panic("do not use") } -type testLogger struct { -} +type testLogger struct{} func (l testLogger) Log(keyvals ...interface{}) error { return nil @@ -170,3 +170,38 @@ func TestDefaultStoreValue(t *testing.T) { cfg2.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("test", flag.PanicOnError)) assert.Equal(t, "memberlist", cfg2.Store) } + +type stringCodec struct { + value string +} + +func (c stringCodec) Decode([]byte) (interface{}, error) { + return c.value, nil +} + +func (c stringCodec) Encode(interface{}) ([]byte, error) { + return []byte(c.value), nil +} +func (c stringCodec) CodecID() string { return c.value } + +func TestMultipleInMemoryClient(t *testing.T) { + logger := log.NewNopLogger() + foo, err := NewClient(Config{ + Store: "inmemory", + }, stringCodec{value: "foo"}, prometheus.NewRegistry(), logger) + require.NoError(t, err) + bar, err := NewClient(Config{ + Store: "inmemory", + }, stringCodec{value: "bar"}, prometheus.NewRegistry(), logger) + require.NoError(t, err) + + require.NoError(t, foo.CAS(context.TODO(), "foo", func(in interface{}) (out interface{}, retry bool, err error) { return "foo", false, nil })) + fooKey, err := foo.Get(ctx, "foo") + require.NoError(t, err) + require.Equal(t, "foo", fooKey.(string)) + + require.NoError(t, bar.CAS(context.TODO(), "bar", func(in interface{}) (out interface{}, retry bool, err error) { return "bar", false, nil })) + barKey, err := bar.Get(ctx, "bar") + require.NoError(t, err) + require.Equal(t, "bar", barKey.(string)) +} diff --git a/kv/consul/client.go b/kv/consul/client.go index d278d8c9e..63114c547 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b } kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx)) - // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value, // and next call to Get will block as expected. We handle missing value below. if err != nil { @@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter { } return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } + +// WithCodec Clones and changes the codec of the consul client. +func (c *Client) WithCodec(codec codec.Codec) *Client { + new := *c + new.codec = codec + return &new +}