Skip to content

Commit

Permalink
Allow in-memory kv-client to support multiple codec (#132)
Browse files Browse the repository at this point in the history
* Allow in-memory kv-client to support multiple codec

* update changelog.

* Remove clone and logger in test.

* Fixup comment according to the change
  • Loading branch information
cyriltovena authored Feb 9, 2022
1 parent 7ab9d37 commit ea22a8f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
* [BUGFIX] Allow in-memory kv-client to support multiple codec #132
9 changes: 6 additions & 3 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels {
// The NewInMemoryKVClient returned by NewClient() is a singleton, so
// that distributors and ingesters started in the same process can
// find themselves.
var inmemoryStoreInit sync.Once
var inmemoryStore Client
var (
inmemoryStoreInit sync.Once
inmemoryStore *consul.Client
)

// StoreConfig is a configuration used for building single store client, either
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
Expand Down Expand Up @@ -146,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
inmemoryStoreInit.Do(func() {
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg)
})
client = inmemoryStore
// however we swap the codec so that we can encode different type of values.
client = inmemoryStore.WithCodec(codec)

case "memberlist":
kv, err := cfg.MemberlistKV()
Expand Down
41 changes: 38 additions & 3 deletions kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,7 +97,6 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
require.Equal(t, "primary", actual["multi"])
require.Equal(t, "primary", actual["inmemory"])
require.Equal(t, "secondary", actual["mock"])

}

func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string {
Expand Down Expand Up @@ -124,6 +124,7 @@ func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogr
}
return result
}

func newConfigsForTest() (cfg StoreConfig, c codec.Codec) {
cfg = StoreConfig{
Multi: MultiConfig{
Expand Down Expand Up @@ -153,8 +154,7 @@ func (m *mockMessage) ProtoMessage() {
panic("do not use")
}

type testLogger struct {
}
type testLogger struct{}

func (l testLogger) Log(keyvals ...interface{}) error {
return nil
Expand All @@ -170,3 +170,38 @@ func TestDefaultStoreValue(t *testing.T) {
cfg2.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("test", flag.PanicOnError))
assert.Equal(t, "memberlist", cfg2.Store)
}

type stringCodec struct {
value string
}

func (c stringCodec) Decode([]byte) (interface{}, error) {
return c.value, nil
}

func (c stringCodec) Encode(interface{}) ([]byte, error) {
return []byte(c.value), nil
}
func (c stringCodec) CodecID() string { return c.value }

func TestMultipleInMemoryClient(t *testing.T) {
logger := log.NewNopLogger()
foo, err := NewClient(Config{
Store: "inmemory",
}, stringCodec{value: "foo"}, prometheus.NewRegistry(), logger)
require.NoError(t, err)
bar, err := NewClient(Config{
Store: "inmemory",
}, stringCodec{value: "bar"}, prometheus.NewRegistry(), logger)
require.NoError(t, err)

require.NoError(t, foo.CAS(context.TODO(), "foo", func(in interface{}) (out interface{}, retry bool, err error) { return "foo", false, nil }))
fooKey, err := foo.Get(ctx, "foo")
require.NoError(t, err)
require.Equal(t, "foo", fooKey.(string))

require.NoError(t, bar.CAS(context.TODO(), "bar", func(in interface{}) (out interface{}, retry bool, err error) { return "bar", false, nil }))
barKey, err := bar.Get(ctx, "bar")
require.NoError(t, err)
require.Equal(t, "bar", barKey.(string))
}
8 changes: 7 additions & 1 deletion kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
}

kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx))

// Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value,
// and next call to Get will block as expected. We handle missing value below.
if err != nil {
Expand Down Expand Up @@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter {
}
return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst)
}

// WithCodec Clones and changes the codec of the consul client.
func (c *Client) WithCodec(codec codec.Codec) *Client {
new := *c
new.codec = codec
return &new
}

0 comments on commit ea22a8f

Please sign in to comment.