Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bootstrap metastore for wal segments #13550

Merged
merged 19 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,33 @@ compactor_grpc_client:
# a ring unless otherwise specified in the component's configuration section.
[memberlist: <memberlist>]

metastore:
# CLI flag: -metastore.data-dir
[data_dir: <string> | default = "./data-metastore/data"]

raft:
# CLI flag: -metastore.raft.dir
[dir: <string> | default = "./data-metastore/raft"]

# CLI flag: -metastore.raft.bootstrap-peers
[bootstrap_peers: <list of strings> | default = []]

# CLI flag: -metastore.raft.server-id
[server_id: <string> | default = "localhost:9099"]

# CLI flag: -metastore.raft.bind-address
[bind_address: <string> | default = "localhost:9099"]

# CLI flag: -metastore.raft.advertise-address
[advertise_address: <string> | default = "localhost:9099"]

metastore_client:
# CLI flag: -metastore.address
[address: <string> | default = "localhost:9095"]

# Configures the gRPC client used to communicate with the metastore.
[grpc_client_config: <grpc_client>]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
Expand Down Expand Up @@ -2598,6 +2625,8 @@ The `frontend_worker` configures the worker - running within the Loki querier -

# Configures the querier gRPC client used to communicate with the
# query-scheduler. This can't be used in conjunction with 'grpc_client_config'.
# The CLI flags prefix for this block configuration is:
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
# metastore.grpc-client-config
[query_scheduler_grpc_client: <grpc_client>]
```

Expand Down Expand Up @@ -2656,6 +2685,7 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `frontend.grpc-client-config`
- `ingester-rf1.client`
- `ingester.client`
- `metastore.grpc-client-config`
- `pattern-ingester.client`
- `querier.frontend-client`
- `querier.frontend-grpc-client`
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ require (
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d
github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/raft v1.7.0
github.com/hashicorp/raft-wal v0.4.1
github.com/heroku/x v0.0.61
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
Expand All @@ -151,9 +153,13 @@ require (
)

require (
github.com/benbjohnson/immutable v0.4.0 // indirect
github.com/coreos/etcd v3.3.27+incompatible // indirect
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down Expand Up @@ -266,7 +272,7 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
Expand Down
18 changes: 16 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.4.0 h1:CTqXbEerYso8YzVPxmWxh2gnoRQbbB9X1quUC8+vGZA=
github.com/benbjohnson/immutable v0.4.0/go.mod h1:iAr8OjJGLnLmVUr9MZ/rz4PWUy6Ouc2JLYuMArmvAJM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -463,6 +465,8 @@ github.com/coredns/coredns v1.6.6/go.mod h1:Bdcnka9HmKGYj12ZIDF3lpQSfDHSsMc85Wj9
github.com/coredns/federation v0.0.0-20190818181423-e032b096babe/go.mod h1:MoqTEFX8GlnKkyq8eBCF94VzkNAOgjdlCJ+Pz/oCLPk=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA=
github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand All @@ -478,6 +482,8 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf h1:GOPo6vn/vTN+3IwZBvXX0y5doJfSC7My0cdzelyOCsQ=
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/couchbase/go-couchbase v0.0.0-20180501122049-16db1f1fe037/go.mod h1:TWI8EKQMs5u5jLKW/tsb9VwauIrMIxQG1r5fMsswK5U=
github.com/couchbase/gomemcached v0.0.0-20180502221210-0da75df14530/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
Expand Down Expand Up @@ -1096,9 +1102,12 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-kms-wrapping/entropy v0.1.0/go.mod h1:d1g9WGtAunDNpek8jUIEJnBlbgKS1N2Q61QkHiZyR1g=
github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
github.com/hashicorp/go-msgpack/v2 v2.0.0/go.mod h1:JIxYkkFJRDDRSoWQBSh7s9QAVThq+82iWmUpmE4jKak=
github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I=
github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
Expand Down Expand Up @@ -1154,11 +1163,15 @@ github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7H
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.3.11/go.mod h1:J8naEwc6XaaCfts7+28whSeRvCqTd6e20BlCU3LtEO4=
github.com/hashicorp/raft v1.7.0 h1:4u24Qn6lQ6uwziM++UgsyiT64Q8GyRn43CV41qPiz1o=
github.com/hashicorp/raft v1.7.0/go.mod h1:N1sKh6Vn47mrWvEArQgILTyng8GoDRNYlgKyK7PMjs0=
github.com/hashicorp/raft-autopilot v0.1.6/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea/go.mod h1:qRd6nFJYYS6Iqnc/8HcUmko2/2Gw8qTFEmxDLii6W5I=
github.com/hashicorp/raft-boltdb v0.0.0-20211202195631-7d34b9fb3f42/go.mod h1:wcXL8otVu5cpJVLjcmq7pmfdRCdaP+xnvu7WQcKJAhs=
github.com/hashicorp/raft-boltdb/v2 v2.2.2/go.mod h1:N8YgaZgNJLpZC+h+by7vDu5rzsRgONThTEeUS3zWbfY=
github.com/hashicorp/raft-wal v0.4.1 h1:aU8XZ6x8R9BAIB/83Z1dTDtXvDVmv9YVYeXxd/1QBSA=
github.com/hashicorp/raft-wal v0.4.1/go.mod h1:A6vP5o8hGOs1LHfC1Okh9xPwWDcmb6Vvuz/QyqUXlOE=
github.com/hashicorp/serf v0.8.1/go.mod h1:h/Ru6tmZazX7WO/GDmwdpS975F019L4t5ng5IgwbNrE=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY=
Expand Down Expand Up @@ -2398,8 +2411,9 @@ golang.zx2c4.com/wireguard v0.0.20200121/go.mod h1:P2HsVp8SKwZEufsnezXZA4GRX/T49
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4/go.mod h1:UdS9frhv65KTfwxME1xE8+rHYoFpbm36gOud1GhBe9c=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.6.2 h1:4r+yNT0+8SWcOkXP+63H2zQbN+USnC73cjGUxnDF94Q=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/oklog/ulid"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/storage/wal"
)

Expand Down Expand Up @@ -100,7 +101,7 @@ func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error {
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()

start := time.Now()
defer func() {
Expand All @@ -115,10 +116,17 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
}

i.metrics.flushesTotal.Add(1)
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), buf); err != nil {
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("store put chunk: %w", err)
}

if _, err := i.metastoreClient.AddBlock(ctx, &metastorepb.AddBlockRequest{
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
Block: w.Meta(id),
}); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("metastore add block: %w", err)
}

return nil
}
41 changes: 16 additions & 25 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -181,6 +182,7 @@ type Ingester struct {
lifecyclerWatcher *services.FailureWatcher

store Storage
metastoreClient metastorepb.MetastoreServiceClient
periodicConfigs []config.PeriodConfig

loopQuit chan struct{}
Expand Down Expand Up @@ -222,6 +224,7 @@ func New(cfg Config, clientConfig client.Config,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
limits Limits, configs *runtime.TenantConfigs,
metastoreClient metastorepb.MetastoreServiceClient,
registerer prometheus.Registerer,
writeFailuresCfg writefailures.Cfg,
metricsNamespace string,
Expand All @@ -247,18 +250,18 @@ func New(cfg Config, clientConfig client.Config,
}

i := &Ingester{
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
flushWorkersDone: sync.WaitGroup{},
loopQuit: make(chan struct{}),
tailersQuit: make(chan struct{}),
metrics: metrics,
// flushOnShutdownSwitch: &OnceSwitch{},
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
flushWorkersDone: sync.WaitGroup{},
loopQuit: make(chan struct{}),
tailersQuit: make(chan struct{}),
metrics: metrics,
metastoreClient: metastoreClient,
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
Expand All @@ -284,19 +287,7 @@ func New(cfg Config, clientConfig client.Config,

i.setupAutoForget()

//if i.cfg.ChunkFilterer != nil {
// i.SetChunkFilterer(i.cfg.ChunkFilterer)
//}
//
//if i.cfg.PipelineWrapper != nil {
// i.SetPipelineWrapper(i.cfg.PipelineWrapper)
//}
//
//if i.cfg.SampleExtractorWrapper != nil {
// i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
//}
//
//i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)
// i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)

return i, nil
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/ingester-rf1/metastore/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package metastoreclient

import (
"flag"
"fmt"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"google.golang.org/grpc"

metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
)

type Config struct {
MetastoreAddress string `yaml:"address"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.MetastoreAddress, "metastore.address", "localhost:9095", "")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("metastore.grpc-client-config", f)
}

func (cfg *Config) Validate() error {
if cfg.MetastoreAddress == "" {
return fmt.Errorf("metastore.address is required")
}
return cfg.GRPCClientConfig.Validate()
}

type Client struct {
metastorepb.MetastoreServiceClient
service services.Service
conn *grpc.ClientConn
config Config
}

func New(config Config) (c *Client, err error) {
c = &Client{config: config}
c.conn, err = dial(c.config)
if err != nil {
return nil, err
}
c.MetastoreServiceClient = metastorepb.NewMetastoreServiceClient(c.conn)
c.service = services.NewIdleService(nil, c.stopping)
return c, nil
}

func (c *Client) stopping(error) error { return c.conn.Close() }

func (c *Client) Service() services.Service { return c.service }

func dial(cfg Config) (*grpc.ClientConn, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
options, err := cfg.GRPCClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
// TODO: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
options = append(options, grpc.WithDefaultServiceConfig(grpcServiceConfig))
return grpc.Dial(cfg.MetastoreAddress, options...)
}

const grpcServiceConfig = `{
"healthCheckConfig": {
"serviceName": "metastore.v1.MetastoreService.RaftLeader"
},
"loadBalancingPolicy":"round_robin",
"methodConfig": [{
"name": [{"service": "metastore.v1.MetastoreService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}`
33 changes: 33 additions & 0 deletions pkg/ingester-rf1/metastore/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package health

import (
"github.com/grafana/dskit/services"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)

type Service interface {
SetServingStatus(string, grpc_health_v1.HealthCheckResponse_ServingStatus)
}

type noopService struct{}

var NoOpService = noopService{}

func (noopService) SetServingStatus(string, grpc_health_v1.HealthCheckResponse_ServingStatus) {}

func NewGRPCHealthService() *GRPCHealthService {
s := health.NewServer()
return &GRPCHealthService{
Server: s,
Service: services.NewIdleService(nil, func(error) error {
s.Shutdown()
return nil
}),
}
}

type GRPCHealthService struct {
services.Service
*health.Server
}
Loading
Loading