From bc41ae2df5c0f9012d29df10b663114977b40083 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 2 Feb 2022 17:12:04 +0100 Subject: [PATCH 01/16] Adds leader election process Signed-off-by: Cyril Tovena --- pkg/usagestats/repoerter.go | 187 ++++++++++++++++++++++++++++++++ pkg/usagestats/reporter_test.go | 51 +++++++++ pkg/util/build/build.go | 12 ++ 3 files changed, 250 insertions(+) create mode 100644 pkg/usagestats/repoerter.go create mode 100644 pkg/usagestats/reporter_test.go diff --git a/pkg/usagestats/repoerter.go b/pkg/usagestats/repoerter.go new file mode 100644 index 000000000000..9702b82e9a7d --- /dev/null +++ b/pkg/usagestats/repoerter.go @@ -0,0 +1,187 @@ +package usagestats + +import ( + "bytes" + "context" + "io" + "time" + + "github.com/go-kit/kit/log/level" + "github.com/go-kit/log" + "github.com/google/uuid" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/kv" + jsoniter "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" + prom "github.com/prometheus/prometheus/web/api/v1" + + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/build" +) + +const ( + ClusterSeedFileName = "loki_cluster_seed.json" + + // attemptNumber how many times we will try to read a corrupted cluster seed before deleting + attemptNumber = 4 + + seedKey = "usagestats_token" +) + +var JSONCodec = jsonCodec{} + +type jsonCodec struct{} + +func (jsonCodec) Decode(data []byte) (interface{}, error) { + var seed ClusterSeed + if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil { + return nil, err + } + return &seed, nil +} + +func (jsonCodec) Encode(obj interface{}) ([]byte, error) { + return jsoniter.ConfigFastest.Marshal(obj) +} +func (jsonCodec) CodecID() string { return "usagestats.jsonCodec" } + +type ClusterSeed struct { + UID string `json:"UID"` + CreatedAt time.Time `json:"created_at"` + prom.PrometheusVersion `json:"version"` +} + +type Reporter struct { + kvClient kv.Client + logger log.Logger + objectClient chunk.ObjectClient + reg prometheus.Registerer + + cluster *ClusterSeed +} + +func NewReporter(kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) { + kvClient, err := kv.NewClient(kvConfig, JSONCodec, kv.RegistererWithKVName(reg, "usagestats"), logger) + if err != nil { + return nil, err + } + return &Reporter{ + kvClient: kvClient, + logger: logger, + objectClient: objectClient, + reg: reg, + }, nil +} + +func (rep *Reporter) initLeader(ctx context.Context) error { + // Try to become leader via the kv client + for backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }); ; backoff.Ongoing() { + // create a new cluster seed + seed := ClusterSeed{ + UID: uuid.NewString(), + PrometheusVersion: build.GetVersion(), + CreatedAt: time.Now(), + } + if err := rep.kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { + // The key is already set, so we don't need to do anything + if in != nil { + if kvSeed, ok := in.(ClusterSeed); ok && kvSeed.UID != seed.UID { + seed = in.(ClusterSeed) + return nil, false, nil + } + } + return seed, true, nil + }); err != nil { + level.Error(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue + } + // Fetch the remote cluster seed. + remoteSeed, err := rep.fetchSeed(ctx, + func(err error) bool { + // we only want to retry if the error is not an object not found error + return !rep.objectClient.IsObjectNotFoundErr(err) + }) + if err != nil { + if rep.objectClient.IsObjectNotFoundErr(err) { + // we are the leader and we need to save the file. + if err := rep.writeSeedFile(ctx, seed); err != nil { + level.Error(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue + } + rep.cluster = &seed + return nil + } + continue + } + rep.cluster = remoteSeed + return nil + } +} + +func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (*ClusterSeed, error) { + var ( + backoff = backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }) + readingErr = 0 + ) + for backoff.Ongoing() { + seed, err := rep.readSeedFile(ctx) + if err != nil { + if !rep.objectClient.IsObjectNotFoundErr(err) { + readingErr++ + } + level.Error(rep.logger).Log("msg", "failed to read cluster seed file", "err", err) + if readingErr > attemptNumber { + if err := rep.objectClient.DeleteObject(ctx, ClusterSeedFileName); err != nil { + level.Error(rep.logger).Log("msg", "failed to delete corrupted cluster seed file, deleting it", "err", err) + } + readingErr = 0 + } + if continueFn == nil || continueFn(err) { + continue + } + return nil, err + } + return seed, nil + } + return nil, backoff.Err() +} + +func (rep *Reporter) readSeedFile(ctx context.Context) (*ClusterSeed, error) { + reader, _, err := rep.objectClient.GetObject(ctx, ClusterSeedFileName) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + defer func() { + if err := reader.Close(); err != nil { + level.Error(rep.logger).Log("msg", "failed to close reader", "err", err) + } + }() + data, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + seed, err := JSONCodec.Decode(data) + if err != nil { + return nil, err + } + return seed.(*ClusterSeed), nil +} + +func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error { + data, err := JSONCodec.Encode(seed) + if err != nil { + return err + } + return rep.objectClient.PutObject(ctx, ClusterSeedFileName, bytes.NewReader(data)) +} diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go new file mode 100644 index 000000000000..66ccd8e5adf4 --- /dev/null +++ b/pkg/usagestats/reporter_test.go @@ -0,0 +1,51 @@ +package usagestats + +import ( + "context" + "os" + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/kv" + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_LeaderElection(t *testing.T) { + result := make(chan *ClusterSeed, 10) + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, storage.NewClientMetrics()) + require.NoError(t, err) + for i := 0; i < 10; i++ { + go func() { + r, err := NewReporter(kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + require.NoError(t, r.initLeader(context.Background())) + result <- r.cluster + }() + } + + var UID []string + for i := 0; i < 10; i++ { + cluster := <-result + require.NotNil(t, cluster) + UID = append(UID, cluster.UID) + } + first := UID[0] + for _, uid := range UID { + require.Equal(t, first, uid) + } + kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, JSONCodec, prometheus.DefaultRegisterer, log.NewLogfmtLogger(os.Stdout)) + require.NoError(t, err) + + data, err := kvClient.Get(context.Background(), seedKey) + require.NoError(t, err) + require.Equal(t, data.(*ClusterSeed).UID, first) +} diff --git a/pkg/util/build/build.go b/pkg/util/build/build.go index 9a6b75a8bca9..76ff10af81b9 100644 --- a/pkg/util/build/build.go +++ b/pkg/util/build/build.go @@ -4,6 +4,7 @@ import ( "runtime" "github.com/prometheus/common/version" + prom "github.com/prometheus/prometheus/web/api/v1" ) // Version information passed to Prometheus version package. @@ -27,3 +28,14 @@ func init() { version.BuildDate = BuildDate version.GoVersion = runtime.Version() } + +func GetVersion() prom.PrometheusVersion { + return prom.PrometheusVersion{ + Version: version.Version, + Revision: version.Revision, + Branch: version.Branch, + BuildUser: version.BuildUser, + BuildDate: version.BuildDate, + GoVersion: version.GoVersion, + } +} From 555e8ba4f23e11e4feb64bbcfdc18bb1780ec20d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 2 Feb 2022 17:13:02 +0100 Subject: [PATCH 02/16] fluke Signed-off-by: Cyril Tovena --- pkg/usagestats/{repoerter.go => reporter.go} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename pkg/usagestats/{repoerter.go => reporter.go} (99%) diff --git a/pkg/usagestats/repoerter.go b/pkg/usagestats/reporter.go similarity index 99% rename from pkg/usagestats/repoerter.go rename to pkg/usagestats/reporter.go index 9702b82e9a7d..ac1ad18addb8 100644 --- a/pkg/usagestats/repoerter.go +++ b/pkg/usagestats/reporter.go @@ -22,7 +22,7 @@ import ( const ( ClusterSeedFileName = "loki_cluster_seed.json" - // attemptNumber how many times we will try to read a corrupted cluster seed before deleting + // attemptNumber how many times we will try to read a corrupted cluster seed before deleting it attemptNumber = 4 seedKey = "usagestats_token" From c72c16369e59c93afb2db6db86fe5fd37b8065ce Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 2 Feb 2022 17:19:18 +0100 Subject: [PATCH 03/16] fixes the kv typecheck --- pkg/usagestats/reporter.go | 35 +++++---------------------------- pkg/usagestats/reporter_test.go | 2 +- pkg/usagestats/seed.go | 31 +++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 31 deletions(-) create mode 100644 pkg/usagestats/seed.go diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index ac1ad18addb8..dc65804a7880 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -11,46 +11,21 @@ import ( "github.com/google/uuid" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/kv" - jsoniter "github.com/json-iterator/go" "github.com/prometheus/client_golang/prometheus" - prom "github.com/prometheus/prometheus/web/api/v1" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/util/build" ) const ( + // File name for the cluster seed file. ClusterSeedFileName = "loki_cluster_seed.json" - - // attemptNumber how many times we will try to read a corrupted cluster seed before deleting it + // attemptNumber how many times we will try to read a corrupted cluster seed before deleting it. attemptNumber = 4 - + // seedKey is the key for the cluster seed to use with the kv store. seedKey = "usagestats_token" ) -var JSONCodec = jsonCodec{} - -type jsonCodec struct{} - -func (jsonCodec) Decode(data []byte) (interface{}, error) { - var seed ClusterSeed - if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil { - return nil, err - } - return &seed, nil -} - -func (jsonCodec) Encode(obj interface{}) ([]byte, error) { - return jsoniter.ConfigFastest.Marshal(obj) -} -func (jsonCodec) CodecID() string { return "usagestats.jsonCodec" } - -type ClusterSeed struct { - UID string `json:"UID"` - CreatedAt time.Time `json:"created_at"` - prom.PrometheusVersion `json:"version"` -} - type Reporter struct { kvClient kv.Client logger log.Logger @@ -89,8 +64,8 @@ func (rep *Reporter) initLeader(ctx context.Context) error { if err := rep.kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { // The key is already set, so we don't need to do anything if in != nil { - if kvSeed, ok := in.(ClusterSeed); ok && kvSeed.UID != seed.UID { - seed = in.(ClusterSeed) + if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID { + seed = *kvSeed return nil, false, nil } } diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index 66ccd8e5adf4..d259a3bc48c9 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -44,7 +44,7 @@ func Test_LeaderElection(t *testing.T) { } kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, JSONCodec, prometheus.DefaultRegisterer, log.NewLogfmtLogger(os.Stdout)) require.NoError(t, err) - + // verify that the ID found is also correctly stored in the kv store and not overridden by another leader. data, err := kvClient.Get(context.Background(), seedKey) require.NoError(t, err) require.Equal(t, data.(*ClusterSeed).UID, first) diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go new file mode 100644 index 000000000000..5e07fc5446d0 --- /dev/null +++ b/pkg/usagestats/seed.go @@ -0,0 +1,31 @@ +package usagestats + +import ( + "time" + + jsoniter "github.com/json-iterator/go" + prom "github.com/prometheus/prometheus/web/api/v1" +) + +type ClusterSeed struct { + UID string `json:"UID"` + CreatedAt time.Time `json:"created_at"` + prom.PrometheusVersion `json:"version"` +} + +var JSONCodec = jsonCodec{} + +type jsonCodec struct{} + +func (jsonCodec) Decode(data []byte) (interface{}, error) { + var seed ClusterSeed + if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil { + return nil, err + } + return &seed, nil +} + +func (jsonCodec) Encode(obj interface{}) ([]byte, error) { + return jsoniter.ConfigFastest.Marshal(obj) +} +func (jsonCodec) CodecID() string { return "usagestats.jsonCodec" } From f097044ab3df5ace9278c64aa76e8e2a7b3e0583 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 4 Feb 2022 00:20:14 +0100 Subject: [PATCH 04/16] wire up the http client --- pkg/usagestats/reporter.go | 100 ++++++++++++++++++++++++++++++-- pkg/usagestats/reporter_test.go | 70 +++++++++++++++++++++- pkg/usagestats/stats.go | 53 +++++++++++++++++ 3 files changed, 215 insertions(+), 8 deletions(-) create mode 100644 pkg/usagestats/stats.go diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index dc65804a7880..f5fcdd61d236 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -3,14 +3,18 @@ package usagestats import ( "bytes" "context" + "errors" "io" + "math" "time" - "github.com/go-kit/kit/log/level" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/storage/chunk" @@ -26,26 +30,50 @@ const ( seedKey = "usagestats_token" ) +var ( + reportCheckInterval = time.Minute + reportInterval = 2 * time.Hour +) + type Reporter struct { kvClient kv.Client logger log.Logger objectClient chunk.ObjectClient reg prometheus.Registerer - cluster *ClusterSeed + services.Service + + leader bool + cluster *ClusterSeed + lastReport time.Time } -func NewReporter(kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) { +func NewReporter(kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer, leader bool) (*Reporter, error) { kvClient, err := kv.NewClient(kvConfig, JSONCodec, kv.RegistererWithKVName(reg, "usagestats"), logger) if err != nil { return nil, err } - return &Reporter{ + r := &Reporter{ kvClient: kvClient, logger: logger, objectClient: objectClient, + leader: leader, reg: reg, - }, nil + } + r.Service = services.NewBasicService(r.starting, r.running, nil) + return r, nil +} + +func (rep *Reporter) starting(ctx context.Context) error { + if rep.leader { + return rep.initLeader(ctx) + } + // follower only wait for the cluster seed to be set. + seed, err := rep.fetchSeed(ctx, nil) + if err == nil { + rep.cluster = seed + } + return err } func (rep *Reporter) initLeader(ctx context.Context) error { @@ -97,6 +125,8 @@ func (rep *Reporter) initLeader(ctx context.Context) error { } } +// fetchSeed fetches the cluster seed from the object store and try until it succeeds. +// continueFn allow you to decide if we should continue retrying. Nil means always retry func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (*ClusterSeed, error) { var ( backoff = backoff.New(ctx, backoff.Config{ @@ -160,3 +190,63 @@ func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error } return rep.objectClient.PutObject(ctx, ClusterSeedFileName, bytes.NewReader(data)) } + +func (rep *Reporter) running(ctx context.Context) error { + if rep.cluster == nil { + return errors.New("cluster seed is missing") + } + // check every minute if we should report. + ticker := time.NewTicker(reportCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + now := time.Now() + // find when to send the next report. + next := nextReport(reportInterval, rep.cluster.CreatedAt, now) + level.Warn(rep.logger).Log("msg", "reporting", "next", next, "now", now, "last", rep.lastReport) + if rep.lastReport.IsZero() && now.Before(next) { + // if we never reported assumed it was the last interval. + rep.lastReport = next.Add(-reportInterval) + } + if next.Equal(now) || now.Sub(rep.lastReport) >= reportInterval { + if err := rep.reportUsage(ctx); err != nil { + level.Warn(rep.logger).Log("msg", "failed to report usage", "err", err) + continue + } + rep.lastReport = next + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (rep *Reporter) reportUsage(ctx context.Context) error { + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: 30 * time.Second, + MaxRetries: 5, + }) + var errs multierror.MultiError + for backoff.Ongoing() { + if err := sendStats(ctx, Stats{ + ClusterID: rep.cluster.UID, + PrometheusVersion: rep.cluster.PrometheusVersion, + }); err != nil { + level.Error(rep.logger).Log("msg", "failed to send stats", "retries", backoff.NumRetries(), "err", err) + errs.Add(err) + backoff.Wait() + continue + } + return nil + } + return errs.Err() +} + +// nextReport compute the next report time based on the interval. +// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time. +func nextReport(interval time.Duration, createdAt, now time.Time) time.Time { + // createdAt * (x * interval ) >= now + return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval) +} diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index d259a3bc48c9..a3b916d40a09 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -4,13 +4,15 @@ import ( "context" "os" "testing" + "time" "github.com/go-kit/log" "github.com/grafana/dskit/kv" - "github.com/grafana/loki/pkg/storage/chunk/local" - "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/storage" ) func Test_LeaderElection(t *testing.T) { @@ -25,7 +27,7 @@ func Test_LeaderElection(t *testing.T) { go func() { r, err := NewReporter(kv.Config{ Store: "inmemory", - }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry(), true) require.NoError(t, err) require.NoError(t, r.initLeader(context.Background())) result <- r.cluster @@ -49,3 +51,65 @@ func Test_LeaderElection(t *testing.T) { require.NoError(t, err) require.Equal(t, data.(*ClusterSeed).UID, first) } + +func Test_ReportLoop(t *testing.T) { + // stub intervals + reportCheckInterval = 500 * time.Millisecond + reportInterval = time.Second + + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, storage.NewClientMetrics()) + require.NoError(t, err) + + r, err := NewReporter(kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry(), true) + require.NoError(t, err) + + require.NoError(t, r.initLeader(context.Background())) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + <-time.After(10 * time.Second) + cancel() + }() + require.Equal(t, context.Canceled, r.running(ctx)) +} + +func Test_NextReport(t *testing.T) { + fixtures := map[string]struct { + interval time.Duration + createdAt time.Time + now time.Time + + next time.Time + }{ + "createdAt aligned with interval and now": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()), + next: time.Unix(0, 2*time.Hour.Nanoseconds()), + }, + "createdAt aligned with interval": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()+1), + next: time.Unix(0, 3*time.Hour.Nanoseconds()), + }, + "createdAt not aligned": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()+1), + next: time.Unix(0, 2*time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()), + }, + } + for name, f := range fixtures { + t.Run(name, func(t *testing.T) { + next := nextReport(f.interval, f.createdAt, f.now) + require.Equal(t, f.next, next) + }) + } +} diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go new file mode 100644 index 000000000000..e9c166c0f2ca --- /dev/null +++ b/pkg/usagestats/stats.go @@ -0,0 +1,53 @@ +package usagestats + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + jsoniter "github.com/json-iterator/go" + prom "github.com/prometheus/prometheus/web/api/v1" +) + +var ( + httpClient = http.Client{Timeout: 5 * time.Second} + usageStatsURL = "https://stats.grafana.org/loki-usage-report" +) + +type Stats struct { + ClusterID string `json:"clusterID"` + Target string `json:"target"` + prom.PrometheusVersion `json:"version"` + Os string `json:"os"` + Arch string `json:"arch"` + Edition string `json:"edition"` + Metrics map[string]interface{} `json:"metrics"` +} + +func sendStats(ctx context.Context, stats Stats) error { + out, err := jsoniter.MarshalIndent(stats, "", " ") + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, usageStatsURL, bytes.NewBuffer(out)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + data, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("failed to send usage stats: %s body: %s", resp.Status, string(data)) + } + return nil +} From 38b661d58bcb4a28ab0fe5c3faae45f5b10442e2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 4 Feb 2022 21:53:43 +0100 Subject: [PATCH 05/16] Hooking into loki services, hit a bug --- pkg/loki/loki.go | 23 ++++++----- pkg/loki/modules.go | 30 +++++++++++++++ pkg/usagestats/reporter.go | 68 ++++++++++++++++++++++----------- pkg/usagestats/reporter_test.go | 54 ++++++++++++++++++++------ pkg/usagestats/seed.go | 2 + pkg/usagestats/stats.go | 6 ++- 6 files changed, 139 insertions(+), 44 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index bb42d3194c16..9446572f3c1f 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -45,6 +45,7 @@ import ( chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" "github.com/grafana/loki/pkg/tracing" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/fakeauth" util_log "github.com/grafana/loki/pkg/util/log" @@ -79,6 +80,7 @@ type Config struct { Tracing tracing.Config `yaml:"tracing"` CompactorConfig compactor.Config `yaml:"compactor,omitempty"` QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageReport usagestats.Config `yaml:"usage_report"` } // RegisterFlags registers flag. @@ -115,6 +117,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Tracing.RegisterFlags(f) c.CompactorConfig.RegisterFlags(f) c.QueryScheduler.RegisterFlags(f) + c.UsageReport.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -245,6 +248,7 @@ type Loki struct { compactor *compactor.Compactor QueryFrontEndTripperware basetripper.Tripperware queryScheduler *scheduler.Scheduler + usageReport *usagestats.Reporter clientMetrics chunk_storage.ClientMetrics @@ -481,6 +485,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(IndexGateway, t.initIndexGateway) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) + mm.RegisterModule(UsageReport, t.initUsageReport) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -492,17 +497,17 @@ func (t *Loki) setupModuleManager() error { Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides, TenantConfigs}, + Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, Store: {Overrides}, - Ingester: {Store, Server, MemberlistKV, TenantConfigs}, - Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs}, + Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, + Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, - QueryFrontend: {QueryFrontendTripperware}, - QueryScheduler: {Server, Overrides, MemberlistKV}, - Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, - TableManager: {Server}, - Compactor: {Server, Overrides, MemberlistKV}, - IndexGateway: {Server}, + QueryFrontend: {QueryFrontendTripperware, UsageReport}, + QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport}, + Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, + TableManager: {Server, UsageReport}, + Compactor: {Server, Overrides, MemberlistKV, UsageReport}, + IndexGateway: {Server, UsageReport}, IngesterQuerier: {Ring}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6f142dac550c..87671f5ae234 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/weaveworks/common/middleware" @@ -49,6 +50,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" serverutil "github.com/grafana/loki/pkg/util/server" @@ -82,6 +84,7 @@ const ( All string = "all" Read string = "read" Write string = "write" + UsageReport string = "usage-report" ) func (t *Loki) initServer() (services.Service, error) { @@ -749,6 +752,33 @@ func (t *Loki) initQueryScheduler() (services.Service, error) { return s, nil } +func (t *Loki) initUsageReport() (services.Service, error) { + if t.Cfg.UsageReport.Disabled { + return nil, nil + } + t.Cfg.UsageReport.Leader = false + if t.isModuleActive(Ingester) { + t.Cfg.UsageReport.Leader = true + } + t.Cfg.UsageReport.Edition = "oss" + t.Cfg.UsageReport.Target = t.Cfg.Target.String() + period, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) + if err != nil { + return nil, err + } + + objectClient, err := chunk_storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig.Config, t.clientMetrics) + if err != nil { + return nil, err + } + ur, err := usagestats.NewReporter(t.Cfg.UsageReport, t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore, objectClient, util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + t.usageReport = ur + return ur, nil +} + func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index f5fcdd61d236..62c2e2cf6d7e 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "errors" + "flag" "io" "math" + "runtime" "time" "github.com/go-kit/log" @@ -35,6 +37,18 @@ var ( reportInterval = 2 * time.Hour ) +type Config struct { + Disabled bool `yaml:"disabled"` + Leader bool `yaml:"-"` + Edition string `yaml:"-"` + Target string `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Disabled, "usage-report.disabled", false, "Allow to disable usage reporting.") +} + type Reporter struct { kvClient kv.Client logger log.Logger @@ -43,12 +57,15 @@ type Reporter struct { services.Service - leader bool + conf Config cluster *ClusterSeed lastReport time.Time } -func NewReporter(kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer, leader bool) (*Reporter, error) { +func NewReporter(config Config, kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) { + if config.Disabled { + return nil, nil + } kvClient, err := kv.NewClient(kvConfig, JSONCodec, kv.RegistererWithKVName(reg, "usagestats"), logger) if err != nil { return nil, err @@ -57,7 +74,7 @@ func NewReporter(kvConfig kv.Config, objectClient chunk.ObjectClient, logger log kvClient: kvClient, logger: logger, objectClient: objectClient, - leader: leader, + conf: config, reg: reg, } r.Service = services.NewBasicService(r.starting, r.running, nil) @@ -65,7 +82,7 @@ func NewReporter(kvConfig kv.Config, objectClient chunk.ObjectClient, logger log } func (rep *Reporter) starting(ctx context.Context) error { - if rep.leader { + if rep.conf.Leader { return rep.initLeader(ctx) } // follower only wait for the cluster seed to be set. @@ -99,7 +116,7 @@ func (rep *Reporter) initLeader(ctx context.Context) error { } return seed, true, nil }); err != nil { - level.Error(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) continue } // Fetch the remote cluster seed. @@ -112,7 +129,7 @@ func (rep *Reporter) initLeader(ctx context.Context) error { if rep.objectClient.IsObjectNotFoundErr(err) { // we are the leader and we need to save the file. if err := rep.writeSeedFile(ctx, seed); err != nil { - level.Error(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) continue } rep.cluster = &seed @@ -142,7 +159,7 @@ func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) b if !rep.objectClient.IsObjectNotFoundErr(err) { readingErr++ } - level.Error(rep.logger).Log("msg", "failed to read cluster seed file", "err", err) + level.Debug(rep.logger).Log("msg", "failed to read cluster seed file", "err", err) if readingErr > attemptNumber { if err := rep.objectClient.DeleteObject(ctx, ClusterSeedFileName); err != nil { level.Error(rep.logger).Log("msg", "failed to delete corrupted cluster seed file, deleting it", "err", err) @@ -198,31 +215,34 @@ func (rep *Reporter) running(ctx context.Context) error { // check every minute if we should report. ticker := time.NewTicker(reportCheckInterval) defer ticker.Stop() + + // find when to send the next report. + next := nextReport(reportInterval, rep.cluster.CreatedAt, time.Now()) + if rep.lastReport.IsZero() { + // if we never reported assumed it was the last interval. + rep.lastReport = next.Add(-reportInterval) + } for { select { case <-ticker.C: now := time.Now() - // find when to send the next report. - next := nextReport(reportInterval, rep.cluster.CreatedAt, now) - level.Warn(rep.logger).Log("msg", "reporting", "next", next, "now", now, "last", rep.lastReport) - if rep.lastReport.IsZero() && now.Before(next) { - // if we never reported assumed it was the last interval. - rep.lastReport = next.Add(-reportInterval) + if !next.Equal(now) && now.Sub(rep.lastReport) < reportInterval { + continue } - if next.Equal(now) || now.Sub(rep.lastReport) >= reportInterval { - if err := rep.reportUsage(ctx); err != nil { - level.Warn(rep.logger).Log("msg", "failed to report usage", "err", err) - continue - } - rep.lastReport = next + level.Debug(rep.logger).Log("msg", "reporting cluster stats", "date", time.Now()) + if err := rep.reportUsage(ctx, next); err != nil { + level.Info(rep.logger).Log("msg", "failed to report usage", "err", err) + continue } + rep.lastReport = next + next = next.Add(reportInterval) case <-ctx.Done(): return ctx.Err() } } } -func (rep *Reporter) reportUsage(ctx context.Context) error { +func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error { backoff := backoff.New(ctx, backoff.Config{ MinBackoff: time.Second, MaxBackoff: 30 * time.Second, @@ -232,9 +252,13 @@ func (rep *Reporter) reportUsage(ctx context.Context) error { for backoff.Ongoing() { if err := sendStats(ctx, Stats{ ClusterID: rep.cluster.UID, - PrometheusVersion: rep.cluster.PrometheusVersion, + PrometheusVersion: build.GetVersion(), + CreatedAt: rep.cluster.CreatedAt, + Interval: interval, + Os: runtime.GOOS, + Arch: runtime.GOARCH, }); err != nil { - level.Error(rep.logger).Log("msg", "failed to send stats", "retries", backoff.NumRetries(), "err", err) + level.Info(rep.logger).Log("msg", "failed to send stats", "retries", backoff.NumRetries(), "err", err) errs.Add(err) backoff.Wait() continue diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index a3b916d40a09..3e98e1b9fd81 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -2,12 +2,15 @@ package usagestats import ( "context" + "net/http" + "net/http/httptest" "os" "testing" "time" "github.com/go-kit/log" "github.com/grafana/dskit/kv" + jsoniter "github.com/json-iterator/go" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -15,21 +18,33 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/storage" ) +var metrics = storage.NewClientMetrics() + func Test_LeaderElection(t *testing.T) { result := make(chan *ClusterSeed, 10) objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ FSConfig: local.FSConfig{ Directory: t.TempDir(), }, - }, storage.NewClientMetrics()) + }, metrics) require.NoError(t, err) - for i := 0; i < 10; i++ { + for i := 0; i < 3; i++ { go func() { - r, err := NewReporter(kv.Config{ + r, err := NewReporter(Config{Leader: true}, kv.Config{ Store: "inmemory", - }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry(), true) + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) - require.NoError(t, r.initLeader(context.Background())) + require.NoError(t, r.starting(context.Background())) + result <- r.cluster + }() + } + for i := 0; i < 7; i++ { + go func() { + r, err := NewReporter(Config{Leader: false}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + require.NoError(t, r.starting(context.Background())) result <- r.cluster }() } @@ -53,30 +68,47 @@ func Test_LeaderElection(t *testing.T) { } func Test_ReportLoop(t *testing.T) { - // stub intervals - reportCheckInterval = 500 * time.Millisecond + // stub + reportCheckInterval = 100 * time.Millisecond reportInterval = time.Second + totalReport := 0 + clusterIDs := []string{} + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + var received Stats + totalReport++ + require.NoError(t, jsoniter.NewDecoder(r.Body).Decode(&received)) + clusterIDs = append(clusterIDs, received.ClusterID) + rw.WriteHeader(http.StatusOK) + })) + usageStatsURL = server.URL + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ FSConfig: local.FSConfig{ Directory: t.TempDir(), }, - }, storage.NewClientMetrics()) + }, metrics) require.NoError(t, err) - r, err := NewReporter(kv.Config{ + r, err := NewReporter(Config{Leader: true}, kv.Config{ Store: "inmemory", - }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry(), true) + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) require.NoError(t, r.initLeader(context.Background())) ctx, cancel := context.WithCancel(context.Background()) go func() { - <-time.After(10 * time.Second) + <-time.After(6 * time.Second) cancel() }() require.Equal(t, context.Canceled, r.running(ctx)) + require.GreaterOrEqual(t, totalReport, 5) + first := clusterIDs[0] + for _, uid := range clusterIDs { + require.Equal(t, first, uid) + } + require.Equal(t, first, r.cluster.UID) } func Test_NextReport(t *testing.T) { diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go index 5e07fc5446d0..cbf4108c440a 100644 --- a/pkg/usagestats/seed.go +++ b/pkg/usagestats/seed.go @@ -17,6 +17,8 @@ var JSONCodec = jsonCodec{} type jsonCodec struct{} +// todo we need to use the default codec for the rest of the code +// currently crashing because the in-memory kvstore use a singleton. func (jsonCodec) Decode(data []byte) (interface{}, error) { var seed ClusterSeed if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil { diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go index e9c166c0f2ca..5a6e8f4b94b9 100644 --- a/pkg/usagestats/stats.go +++ b/pkg/usagestats/stats.go @@ -18,8 +18,10 @@ var ( ) type Stats struct { - ClusterID string `json:"clusterID"` - Target string `json:"target"` + ClusterID string `json:"clusterID"` + CreatedAt time.Time `json:"createdAt"` + Interval time.Time `json:"interval"` + Target string `json:"target"` prom.PrometheusVersion `json:"version"` Os string `json:"os"` Arch string `json:"arch"` From cd75528c2c67ddf302dbb5cada47e156d6d5dfa0 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 9 Feb 2022 00:55:53 +0100 Subject: [PATCH 06/16] Add stats variable. --- pkg/loki/modules.go | 4 +- pkg/usagestats/reporter.go | 16 +- pkg/usagestats/reporter_test.go | 2 +- pkg/usagestats/stats.go | 277 +++++++++++++++++++++++++++++++- pkg/usagestats/stats_test.go | 97 +++++++++++ 5 files changed, 377 insertions(+), 19 deletions(-) create mode 100644 pkg/usagestats/stats_test.go diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 87671f5ae234..685de4b69e48 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -760,8 +760,8 @@ func (t *Loki) initUsageReport() (services.Service, error) { if t.isModuleActive(Ingester) { t.Cfg.UsageReport.Leader = true } - t.Cfg.UsageReport.Edition = "oss" - t.Cfg.UsageReport.Target = t.Cfg.Target.String() + usagestats.Edition("oss") + usagestats.Target(t.Cfg.Target.String()) period, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) if err != nil { return nil, err diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 62c2e2cf6d7e..b01dbac5ea8c 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -7,7 +7,6 @@ import ( "flag" "io" "math" - "runtime" "time" "github.com/go-kit/log" @@ -38,10 +37,8 @@ var ( ) type Config struct { - Disabled bool `yaml:"disabled"` - Leader bool `yaml:"-"` - Edition string `yaml:"-"` - Target string `yaml:"-"` + Disabled bool `yaml:"disabled"` + Leader bool `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -250,14 +247,7 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error }) var errs multierror.MultiError for backoff.Ongoing() { - if err := sendStats(ctx, Stats{ - ClusterID: rep.cluster.UID, - PrometheusVersion: build.GetVersion(), - CreatedAt: rep.cluster.CreatedAt, - Interval: interval, - Os: runtime.GOOS, - Arch: runtime.GOARCH, - }); err != nil { + if err := sendReport(ctx, rep.cluster, interval); err != nil { level.Info(rep.logger).Log("msg", "failed to send stats", "retries", backoff.NumRetries(), "err", err) errs.Add(err) backoff.Wait() diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index 3e98e1b9fd81..c0bb532d2797 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -75,7 +75,7 @@ func Test_ReportLoop(t *testing.T) { totalReport := 0 clusterIDs := []string{} server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - var received Stats + var received Report totalReport++ require.NoError(t, jsoniter.NewDecoder(r.Body).Decode(&received)) clusterIDs = append(clusterIDs, received.ClusterID) diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go index 5a6e8f4b94b9..62cede44a172 100644 --- a/pkg/usagestats/stats.go +++ b/pkg/usagestats/stats.go @@ -3,21 +3,34 @@ package usagestats import ( "bytes" "context" + "encoding/json" + "expvar" "fmt" "io" + "math" "net/http" + "runtime" + "strings" + "sync" "time" + "github.com/grafana/loki/pkg/util/build" + + "github.com/cespare/xxhash/v2" jsoniter "github.com/json-iterator/go" prom "github.com/prometheus/prometheus/web/api/v1" + "go.uber.org/atomic" ) var ( httpClient = http.Client{Timeout: 5 * time.Second} usageStatsURL = "https://stats.grafana.org/loki-usage-report" + statsPrefix = "github.com/grafana/loki/" + targetKey = "target" + editionKey = "edition" ) -type Stats struct { +type Report struct { ClusterID string `json:"clusterID"` CreatedAt time.Time `json:"createdAt"` Interval time.Time `json:"interval"` @@ -29,8 +42,8 @@ type Stats struct { Metrics map[string]interface{} `json:"metrics"` } -func sendStats(ctx context.Context, stats Stats) error { - out, err := jsoniter.MarshalIndent(stats, "", " ") +func sendReport(ctx context.Context, seed *ClusterSeed, interval time.Time) error { + out, err := jsoniter.MarshalIndent(buildReport(seed, interval), "", " ") if err != nil { return err } @@ -53,3 +66,261 @@ func sendStats(ctx context.Context, stats Stats) error { } return nil } + +func buildReport(seed *ClusterSeed, interval time.Time) Report { + var ( + targetName string + editionName string + ) + if target := expvar.Get(statsPrefix + targetKey); target != nil { + if target, ok := target.(*expvar.String); ok { + targetName = target.Value() + } + } + if edition := expvar.Get(statsPrefix + editionKey); edition != nil { + if edition, ok := edition.(*expvar.String); ok { + editionName = edition.Value() + } + } + + return Report{ + ClusterID: seed.UID, + PrometheusVersion: build.GetVersion(), + CreatedAt: seed.CreatedAt, + Interval: interval, + Os: runtime.GOOS, + Arch: runtime.GOARCH, + Target: targetName, + Edition: editionName, + Metrics: buildMetrics(), + } +} + +func buildMetrics() map[string]interface{} { + result := map[string]interface{}{ + "memstats": memstats(), + "num_cpu": runtime.NumCPU(), + "num_goroutine": runtime.NumGoroutine(), + } + expvar.Do(func(kv expvar.KeyValue) { + if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey { + return + } + var value interface{} + switch v := kv.Value.(type) { + case *expvar.Int: + value = v.Value() + case *expvar.Float: + value = v.Value() + case *expvar.String: + value = v.Value() + case *Statistics: + value = v.Value() + case *Counter: + v.updateRate() + value = v.Value() + v.reset() + case *WordCounter: + value = v.Value() + default: + value = v.String() + } + result[strings.TrimPrefix(kv.Key, statsPrefix)] = value + }) + return result +} + +func memstats() interface{} { + stats := new(runtime.MemStats) + runtime.ReadMemStats(stats) + return map[string]interface{}{ + "alloc": stats.Alloc, + "total_alloc": stats.TotalAlloc, + "sys": stats.Sys, + "mallocs": stats.Mallocs, + "frees": stats.Frees, + "heap_alloc": stats.HeapAlloc, + "heap_sys": stats.HeapSys, + "heap_idle": stats.HeapIdle, + "heap_inuse": stats.HeapInuse, + "heap_released": stats.HeapReleased, + "heap_objects": stats.HeapObjects, + "stack_inuse": stats.StackInuse, + "stack_sys": stats.StackSys, + "other_sys": stats.OtherSys, + "pause_total_ns": stats.PauseTotalNs, + "num_gc": stats.NumGC, + "gc_cpu_fraction": stats.GCCPUFraction, + } +} + +func NewFloat(name string) *expvar.Float { + return expvar.NewFloat(statsPrefix + name) +} + +func NewInt(name string) *expvar.Int { + return expvar.NewInt(statsPrefix + name) +} + +func NewString(name string) *expvar.String { + return expvar.NewString(statsPrefix + name) +} + +func Target(target string) { + NewString(targetKey).Set(target) +} + +func Edition(edition string) { + NewString(editionKey).Set(edition) +} + +type Statistics struct { + min *atomic.Float64 + max *atomic.Float64 + count *atomic.Int64 + + avg *atomic.Float64 + + // require for stddev and stdvar + mean *atomic.Float64 + value *atomic.Float64 +} + +func NewStatistics(name string) *Statistics { + s := &Statistics{ + min: atomic.NewFloat64(math.Inf(0)), + max: atomic.NewFloat64(math.Inf(-1)), + count: atomic.NewInt64(0), + avg: atomic.NewFloat64(0), + mean: atomic.NewFloat64(0), + value: atomic.NewFloat64(0), + } + expvar.Publish(statsPrefix+name, s) + return s +} + +func (s *Statistics) String() string { + b, _ := json.Marshal(s.Value()) + return string(b) +} + +func (s *Statistics) Value() map[string]interface{} { + stdvar := s.value.Load() / float64(s.count.Load()) + return map[string]interface{}{ + "min": s.min.Load(), + "max": s.max.Load(), + "avg": s.avg.Load(), + "count": s.count.Load(), + "stddev": math.Sqrt(stdvar), + "stdvar": stdvar, + } +} + +func (s *Statistics) Record(v float64) { + for { + min := s.min.Load() + if min <= v { + break + } + if s.min.CAS(min, v) { + break + } + } + for { + max := s.max.Load() + if max >= v { + break + } + if s.max.CAS(max, v) { + break + } + } + for { + avg := s.avg.Load() + count := s.count.Load() + mean := s.mean.Load() + value := s.value.Load() + + delta := v - mean + newCount := count + 1 + newMean := mean + (delta / float64(newCount)) + newValue := value + (delta * (v - newMean)) + newAvg := avg + ((v - avg) / float64(newCount)) + if s.avg.CAS(avg, newAvg) && s.count.CAS(count, newCount) && s.mean.CAS(mean, newMean) && s.value.CAS(value, newValue) { + break + } + } +} + +type Counter struct { + total *atomic.Int64 + rate *atomic.Float64 + + resetTime time.Time +} + +func NewCounter(name string) *Counter { + c := &Counter{ + total: atomic.NewInt64(0), + rate: atomic.NewFloat64(0), + resetTime: time.Now(), + } + expvar.Publish(statsPrefix+name, c) + return c +} + +func (c *Counter) updateRate() { + total := c.total.Load() + c.rate.Store(float64(total) / time.Since(c.resetTime).Seconds()) +} + +func (c *Counter) reset() { + c.total.Store(0) + c.rate.Store(0) + c.resetTime = time.Now() +} + +func (c *Counter) Inc(i int64) { + c.total.Add(i) +} + +func (c *Counter) String() string { + b, _ := json.Marshal(c.Value()) + return string(b) +} + +func (c *Counter) Value() map[string]interface{} { + return map[string]interface{}{ + "total": c.total.Load(), + "rate": c.rate.Load(), + } +} + +type WordCounter struct { + words sync.Map + count *atomic.Int64 +} + +func NewWordCounter(name string) *WordCounter { + c := &WordCounter{ + count: atomic.NewInt64(0), + words: sync.Map{}, + } + expvar.Publish(statsPrefix+name, c) + return c +} + +func (w *WordCounter) Add(word string) { + if _, loaded := w.words.LoadOrStore(xxhash.Sum64String(word), struct{}{}); !loaded { + w.count.Add(1) + } +} + +func (w *WordCounter) String() string { + b, _ := json.Marshal(w.Value()) + return string(b) +} + +func (w *WordCounter) Value() int64 { + return w.count.Load() +} diff --git a/pkg/usagestats/stats_test.go b/pkg/usagestats/stats_test.go new file mode 100644 index 000000000000..de5f5005b495 --- /dev/null +++ b/pkg/usagestats/stats_test.go @@ -0,0 +1,97 @@ +package usagestats + +import ( + "runtime" + "testing" + "time" + + "github.com/grafana/loki/pkg/util/build" + + "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/require" +) + +func Test_BuildReport(t *testing.T) { + now := time.Now() + seed := &ClusterSeed{ + UID: uuid.New().String(), + CreatedAt: now, + } + + Edition("OSS") + Target("compactor") + NewString("compression").Set("lz4") + NewInt("compression_ratio").Set(100) + NewFloat("size_mb").Set(100.1) + NewCounter("lines_written").Inc(200) + s := NewStatistics("query_throughput") + s.Record(300) + s.Record(5) + w := NewWordCounter("active_tenants") + w.Add("foo") + w.Add("bar") + w.Add("foo") + + r := buildReport(seed, now.Add(time.Hour)) + require.Equal(t, r.Arch, runtime.GOARCH) + require.Equal(t, r.Os, runtime.GOOS) + require.Equal(t, r.PrometheusVersion, build.GetVersion()) + require.Equal(t, r.Edition, "OSS") + require.Equal(t, r.Target, "compactor") + require.Equal(t, r.Metrics["num_cpu"], runtime.NumCPU()) + require.Equal(t, r.Metrics["num_goroutine"], runtime.NumGoroutine()) + require.Equal(t, r.Metrics["compression"], "lz4") + require.Equal(t, r.Metrics["compression_ratio"], int64(100)) + require.Equal(t, r.Metrics["size_mb"], 100.1) + require.Equal(t, r.Metrics["lines_written"].(map[string]interface{})["total"], int64(200)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["min"], float64(5)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["max"], float64(300)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["count"], int64(2)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["avg"], float64(300+5)/2) + require.Equal(t, r.Metrics["active_tenants"], int64(2)) + + out, _ := jsoniter.MarshalIndent(r, "", " ") + t.Log(string(out)) +} + +func TestCounter(t *testing.T) { + c := NewCounter("test_counter") + c.Inc(100) + c.Inc(200) + c.Inc(300) + time.Sleep(1 * time.Second) + c.updateRate() + v := c.Value() + require.Equal(t, int64(600), v["total"]) + require.GreaterOrEqual(t, v["rate"], float64(590)) + c.reset() + require.Equal(t, int64(0), c.Value()["total"]) + require.Equal(t, float64(0), c.Value()["rate"]) +} + +func TestStatistic(t *testing.T) { + s := NewStatistics("test_stats") + s.Record(100) + s.Record(200) + s.Record(300) + v := s.Value() + require.Equal(t, float64(100), v["min"]) + require.Equal(t, float64(300), v["max"]) + require.Equal(t, int64(3), v["count"]) + require.Equal(t, float64(100+200+300)/3, v["avg"]) + require.Equal(t, float64(81.64965809277261), v["stddev"]) + require.Equal(t, float64(6666.666666666667), v["stdvar"]) +} + +func TestWordCounter(t *testing.T) { + w := NewWordCounter("test_words_count") + for i := 0; i < 100; i++ { + go func() { + w.Add("foo") + w.Add("bar") + w.Add("foo") + }() + } + require.Equal(t, int64(2), w.Value()) +} From a0540565a0aaf6299ea4a88472882f61c0d19691 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 9 Feb 2022 09:25:47 +0100 Subject: [PATCH 07/16] re-vendor dskit and improve to never fail service --- go.mod | 4 +- go.sum | 4 +- pkg/usagestats/reporter.go | 41 +++++------ pkg/usagestats/reporter_test.go | 6 +- .../grafana/dskit/backoff/backoff.go | 6 +- .../grafana/dskit/concurrency/runner.go | 55 ++++++++------ .../grafana/dskit/crypto/tls/tls.go | 10 +-- .../grafana/dskit/grpcclient/grpcclient.go | 14 ++-- vendor/github.com/grafana/dskit/kv/client.go | 19 +++-- .../grafana/dskit/kv/consul/client.go | 18 +++-- .../github.com/grafana/dskit/kv/etcd/etcd.go | 6 +- .../dskit/kv/memberlist/memberlist_client.go | 34 ++++----- .../dskit/kv/memberlist/tcp_transport.go | 6 +- vendor/github.com/grafana/dskit/kv/multi.go | 8 +- .../grafana/dskit/ring/basic_lifecycler.go | 18 +++++ vendor/github.com/grafana/dskit/ring/http.go | 73 ++++++++++++------- .../grafana/dskit/ring/lifecycler.go | 45 +++++++++--- .../grafana/dskit/ring/replication_set.go | 3 +- vendor/github.com/grafana/dskit/ring/ring.go | 46 +++++++++--- .../grafana/dskit/runtimeconfig/manager.go | 2 +- vendor/modules.txt | 2 +- 21 files changed, 264 insertions(+), 156 deletions(-) diff --git a/go.mod b/go.mod index ed8a51e18ba8..f91e27c85f40 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/google/go-cmp v0.5.6 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 - github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 + github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul/api v1.12.0 @@ -103,6 +103,7 @@ require ( require ( github.com/google/renameio/v2 v2.0.0 + github.com/google/uuid v1.2.0 github.com/mattn/go-ieproxy v0.0.1 github.com/xdg-go/scram v1.0.2 gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 @@ -179,7 +180,6 @@ require ( github.com/google/go-querystring v1.0.0 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect - github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/googleapis/gnostic v0.4.1 // indirect github.com/gophercloud/gophercloud v0.24.0 // indirect diff --git a/go.sum b/go.sum index 48b6c7e3bf12..7d30e4a30549 100644 --- a/go.sum +++ b/go.sum @@ -1013,8 +1013,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 h1:IXo/V2+KKLYLD724qh3uRaZgAy3BV3HdtXuSs7lb3jU= -github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= +github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 h1:R0Pw7VjouhYSS7bsMdxEidcJbCq1KUBCzPgsh7805NM= +github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0/go.mod h1:Q9WmQ9cVkrHx6g4KSl6TN+N3vEOkDZd9RtyXCHd5OPQ= github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 h1:aEOagXOTqtN9gd4jiDuP/5a81HdoJBqkVfn8WaxbsK4= github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8/go.mod h1:QAvS2C7TtQRhhv9Uf/sxD+BUhpkrPFm5jK/9MzUiDCY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index b01dbac5ea8c..96d70256ff73 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -3,7 +3,6 @@ package usagestats import ( "bytes" "context" - "errors" "flag" "io" "math" @@ -33,7 +32,7 @@ const ( var ( reportCheckInterval = time.Minute - reportInterval = 2 * time.Hour + reportInterval = 1 * time.Hour ) type Config struct { @@ -74,23 +73,11 @@ func NewReporter(config Config, kvConfig kv.Config, objectClient chunk.ObjectCli conf: config, reg: reg, } - r.Service = services.NewBasicService(r.starting, r.running, nil) + r.Service = services.NewBasicService(nil, r.running, nil) return r, nil } -func (rep *Reporter) starting(ctx context.Context) error { - if rep.conf.Leader { - return rep.initLeader(ctx) - } - // follower only wait for the cluster seed to be set. - seed, err := rep.fetchSeed(ctx, nil) - if err == nil { - rep.cluster = seed - } - return err -} - -func (rep *Reporter) initLeader(ctx context.Context) error { +func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { // Try to become leader via the kv client for backoff := backoff.New(ctx, backoff.Config{ MinBackoff: time.Second, @@ -129,16 +116,25 @@ func (rep *Reporter) initLeader(ctx context.Context) error { level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) continue } - rep.cluster = &seed - return nil + return &seed } continue } - rep.cluster = remoteSeed - return nil + return remoteSeed } } +func (rep *Reporter) init(ctx context.Context) { + if rep.conf.Leader { + rep.cluster = rep.initLeader(ctx) + return + } + // follower only wait for the cluster seed to be set. + // it will try forever to fetch the cluster seed. + seed, _ := rep.fetchSeed(ctx, nil) + rep.cluster = seed +} + // fetchSeed fetches the cluster seed from the object store and try until it succeeds. // continueFn allow you to decide if we should continue retrying. Nil means always retry func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (*ClusterSeed, error) { @@ -206,9 +202,8 @@ func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error } func (rep *Reporter) running(ctx context.Context) error { - if rep.cluster == nil { - return errors.New("cluster seed is missing") - } + rep.init(ctx) + // check every minute if we should report. ticker := time.NewTicker(reportCheckInterval) defer ticker.Stop() diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index c0bb532d2797..c3dd63c2af41 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -34,7 +34,7 @@ func Test_LeaderElection(t *testing.T) { Store: "inmemory", }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) - require.NoError(t, r.starting(context.Background())) + r.init(context.Background()) result <- r.cluster }() } @@ -44,7 +44,7 @@ func Test_LeaderElection(t *testing.T) { Store: "inmemory", }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) - require.NoError(t, r.starting(context.Background())) + r.init(context.Background()) result <- r.cluster }() } @@ -95,7 +95,7 @@ func Test_ReportLoop(t *testing.T) { }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) - require.NoError(t, r.initLeader(context.Background())) + r.initLeader(context.Background()) ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/vendor/github.com/grafana/dskit/backoff/backoff.go b/vendor/github.com/grafana/dskit/backoff/backoff.go index 2146f3b928e3..c5d454715974 100644 --- a/vendor/github.com/grafana/dskit/backoff/backoff.go +++ b/vendor/github.com/grafana/dskit/backoff/backoff.go @@ -10,9 +10,9 @@ import ( // Config configures a Backoff type Config struct { - MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level - MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level - MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries + MinBackoff time.Duration `yaml:"min_period" category:"advanced"` // start backoff at this level + MaxBackoff time.Duration `yaml:"max_period" category:"advanced"` // increase exponentially to this level + MaxRetries int `yaml:"max_retries" category:"advanced"` // give up after this many; zero means infinite retries } // RegisterFlagsWithPrefix for Config. diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index a6740f3ac9c4..023be10d7a0a 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/dskit/internal/math" @@ -62,45 +63,53 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. // The execution breaks on first error encountered. +// +// Deprecated: use ForEachJob instead. func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { - if len(jobs) == 0 { - return nil + return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + return jobFunc(ctx, jobs[idx]) + }) +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +// +// Deprecated: will be removed as it's not needed when using ForEachJob. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] } + return jobs +} - // Push all jobs to a channel. - ch := make(chan interface{}, len(jobs)) - for _, job := range jobs { - ch <- job +// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. +// The execution breaks on first error encountered. +func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { + if jobs == 0 { + return nil } - close(ch) + + // Initialise indexes with -1 so first Inc() returns index 0. + indexes := atomic.NewInt64(-1) // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ { + for ix := 0; ix < math.Min(concurrency, jobs); ix++ { g.Go(func() error { - for job := range ch { - if err := ctx.Err(); err != nil { - return err + for ctx.Err() == nil { + idx := int(indexes.Inc()) + if idx >= jobs { + return nil } - if err := jobFunc(ctx, job); err != nil { + if err := jobFunc(ctx, idx); err != nil { return err } } - - return nil + return ctx.Err() }) } // Wait until done (or context has canceled). return g.Wait() } - -// CreateJobsFromStrings is an utility to create jobs from an slice of strings. -func CreateJobsFromStrings(values []string) []interface{} { - jobs := make([]interface{}, len(values)) - for i := 0; i < len(values); i++ { - jobs[i] = values[i] - } - return jobs -} diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go index a6fa46f07326..1588edc8939a 100644 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ b/vendor/github.com/grafana/dskit/crypto/tls/tls.go @@ -13,11 +13,11 @@ import ( // ClientConfig is the config for client TLS. type ClientConfig struct { - CertPath string `yaml:"tls_cert_path"` - KeyPath string `yaml:"tls_key_path"` - CAPath string `yaml:"tls_ca_path"` - ServerName string `yaml:"tls_server_name"` - InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` + CertPath string `yaml:"tls_cert_path" category:"advanced"` + KeyPath string `yaml:"tls_key_path" category:"advanced"` + CAPath string `yaml:"tls_ca_path" category:"advanced"` + ServerName string `yaml:"tls_server_name" category:"advanced"` + InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify" category:"advanced"` } var ( diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 094337f5d2c1..e7d93b64ec52 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -18,16 +18,16 @@ import ( // Config for a gRPC client. type Config struct { - MaxRecvMsgSize int `yaml:"max_recv_msg_size"` - MaxSendMsgSize int `yaml:"max_send_msg_size"` - GRPCCompression string `yaml:"grpc_compression"` - RateLimit float64 `yaml:"rate_limit"` - RateLimitBurst int `yaml:"rate_limit_burst"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"` + MaxSendMsgSize int `yaml:"max_send_msg_size" category:"advanced"` + GRPCCompression string `yaml:"grpc_compression" category:"advanced"` + RateLimit float64 `yaml:"rate_limit" category:"advanced"` + RateLimitBurst int `yaml:"rate_limit_burst" category:"advanced"` - BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` + BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits" category:"advanced"` BackoffConfig backoff.Config `yaml:"backoff_config"` - TLSEnabled bool `yaml:"tls_enabled"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS tls.ClientConfig `yaml:",inline"` } diff --git a/vendor/github.com/grafana/dskit/kv/client.go b/vendor/github.com/grafana/dskit/kv/client.go index b73620dfd6ae..42bf5595461c 100644 --- a/vendor/github.com/grafana/dskit/kv/client.go +++ b/vendor/github.com/grafana/dskit/kv/client.go @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels { // The NewInMemoryKVClient returned by NewClient() is a singleton, so // that distributors and ingesters started in the same process can // find themselves. -var inmemoryStoreInit sync.Once -var inmemoryStore Client +var ( + inmemoryStoreInit sync.Once + inmemoryStore *consul.Client +) // StoreConfig is a configuration used for building single store client, either // Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep @@ -53,7 +55,7 @@ type StoreConfig struct { // where store can be consul or inmemory. type Config struct { Store string `yaml:"store"` - Prefix string `yaml:"prefix"` + Prefix string `yaml:"prefix" category:"advanced"` StoreConfig `yaml:",inline"` Mock Client `yaml:"-"` @@ -76,8 +78,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f if flagsPrefix == "" { flagsPrefix = "ring." } + + // Allow clients to override default store by setting it before calling this method. + if cfg.Store == "" { + cfg.Store = "consul" + } + f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.") - f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") + f.StringVar(&cfg.Store, flagsPrefix+"store", cfg.Store, "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") } // Client is a high-level client for key-value stores (such as Etcd and @@ -140,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co inmemoryStoreInit.Do(func() { inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) - client = inmemoryStore + // however we swap the codec so that we can encode different type of values. + client = inmemoryStore.WithCodec(codec) case "memberlist": kv, err := cfg.MemberlistKV() diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/vendor/github.com/grafana/dskit/kv/consul/client.go index 69219cf7488f..63114c547b69 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/vendor/github.com/grafana/dskit/kv/consul/client.go @@ -40,11 +40,11 @@ var ( // Config to create a ConsulClient type Config struct { Host string `yaml:"host"` - ACLToken string `yaml:"acl_token"` - HTTPClientTimeout time.Duration `yaml:"http_client_timeout"` - ConsistentReads bool `yaml:"consistent_reads"` - WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit - WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1 + ACLToken string `yaml:"acl_token" category:"advanced"` + HTTPClientTimeout time.Duration `yaml:"http_client_timeout" category:"advanced"` + ConsistentReads bool `yaml:"consistent_reads" category:"advanced"` + WatchKeyRateLimit float64 `yaml:"watch_rate_limit" category:"advanced"` // Zero disables rate limit + WatchKeyBurstSize int `yaml:"watch_burst_size" category:"advanced"` // Burst when doing rate-limit, defaults to 1 // Used in tests only. MaxCasRetries int `yaml:"-"` @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b } kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx)) - // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value, // and next call to Get will block as expected. We handle missing value below. if err != nil { @@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter { } return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } + +// WithCodec Clones and changes the codec of the consul client. +func (c *Client) WithCodec(codec codec.Codec) *Client { + new := *c + new.codec = codec + return &new +} diff --git a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go index fa6944d4f520..0661fc5daa1d 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go +++ b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go @@ -22,9 +22,9 @@ import ( // Config for a new etcd.Client. type Config struct { Endpoints []string `yaml:"endpoints"` - DialTimeout time.Duration `yaml:"dial_timeout"` - MaxRetries int `yaml:"max_retries"` - EnableTLS bool `yaml:"tls_enabled"` + DialTimeout time.Duration `yaml:"dial_timeout" category:"advanced"` + MaxRetries int `yaml:"max_retries" category:"advanced"` + EnableTLS bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` UserName string `yaml:"username"` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index d7ad176d0e5c..30f0992d3521 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -124,16 +124,16 @@ func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error { // KVConfig is a config for memberlist.KV type KVConfig struct { // Memberlist options. - NodeName string `yaml:"node_name"` - RandomizeNodeName bool `yaml:"randomize_node_name"` - StreamTimeout time.Duration `yaml:"stream_timeout"` - RetransmitMult int `yaml:"retransmit_factor"` - PushPullInterval time.Duration `yaml:"pull_push_interval"` - GossipInterval time.Duration `yaml:"gossip_interval"` - GossipNodes int `yaml:"gossip_nodes"` - GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"` - DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"` - EnableCompression bool `yaml:"compression_enabled"` + NodeName string `yaml:"node_name" category:"advanced"` + RandomizeNodeName bool `yaml:"randomize_node_name" category:"advanced"` + StreamTimeout time.Duration `yaml:"stream_timeout" category:"advanced"` + RetransmitMult int `yaml:"retransmit_factor" category:"advanced"` + PushPullInterval time.Duration `yaml:"pull_push_interval" category:"advanced"` + GossipInterval time.Duration `yaml:"gossip_interval" category:"advanced"` + GossipNodes int `yaml:"gossip_nodes" category:"advanced"` + GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` + DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` + EnableCompression bool `yaml:"compression_enabled" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -141,20 +141,20 @@ type KVConfig struct { // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` - MinJoinBackoff time.Duration `yaml:"min_join_backoff"` - MaxJoinBackoff time.Duration `yaml:"max_join_backoff"` - MaxJoinRetries int `yaml:"max_join_retries"` + MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"` + MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"` + MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"` AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"` - RejoinInterval time.Duration `yaml:"rejoin_interval"` + RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"` + LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). - MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"` + MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` TCPTransport TCPTransportConfig `yaml:",inline"` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index 4265a3b22322..eb5451878359 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -45,10 +45,10 @@ type TCPTransportConfig struct { // Timeout used when making connections to other nodes to send packet. // Zero = no timeout - PacketDialTimeout time.Duration `yaml:"packet_dial_timeout"` + PacketDialTimeout time.Duration `yaml:"packet_dial_timeout" category:"advanced"` // Timeout for writing packet data. Zero = no timeout. - PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"` + PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-"` @@ -57,7 +57,7 @@ type TCPTransportConfig struct { MetricsRegisterer prometheus.Registerer `yaml:"-"` MetricsNamespace string `yaml:"-"` - TLSEnabled bool `yaml:"tls_enabled"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` } diff --git a/vendor/github.com/grafana/dskit/kv/multi.go b/vendor/github.com/grafana/dskit/kv/multi.go index 8a3382e9859f..9a9c24bb834a 100644 --- a/vendor/github.com/grafana/dskit/kv/multi.go +++ b/vendor/github.com/grafana/dskit/kv/multi.go @@ -16,11 +16,11 @@ import ( // MultiConfig is a configuration for MultiClient. type MultiConfig struct { - Primary string `yaml:"primary"` - Secondary string `yaml:"secondary"` + Primary string `yaml:"primary" category:"advanced"` + Secondary string `yaml:"secondary" category:"advanced"` - MirrorEnabled bool `yaml:"mirror_enabled"` - MirrorTimeout time.Duration `yaml:"mirror_timeout"` + MirrorEnabled bool `yaml:"mirror_enabled" category:"advanced"` + MirrorTimeout time.Duration `yaml:"mirror_timeout" category:"advanced"` // ConfigProvider returns channel with MultiRuntimeConfig updates. ConfigProvider func() <-chan MultiRuntimeConfig `yaml:"-"` diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 726a85430d32..32775c98291c 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -3,6 +3,7 @@ package ring import ( "context" "fmt" + "net/http" "sort" "sync" "time" @@ -491,3 +492,20 @@ func (l *BasicLifecycler) run(fn func() error) error { return <-errCh } } + +func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return l.store.CAS(ctx, l.ringKey, f) +} + +func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := l.store.Get(ctx, l.ringKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req) +} diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index f23f08b81241..1d6c10e801df 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -10,8 +10,6 @@ import ( "sort" "strings" "time" - - "github.com/go-kit/log/level" ) const pageContent = ` @@ -90,19 +88,6 @@ func init() { pageTemplate = template.Must(t.Parse(pageContent)) } -func (r *Ring) forget(ctx context.Context, id string) error { - unregister := func(in interface{}) (out interface{}, retry bool, err error) { - if in == nil { - return nil, false, fmt.Errorf("found empty ring when trying to unregister") - } - - ringDesc := in.(*Desc) - ringDesc.RemoveIngester(id) - return ringDesc, true, nil - } - return r.KVClient.CAS(ctx, r.key, unregister) -} - type ingesterDesc struct { ID string `json:"id"` State string `json:"state"` @@ -121,11 +106,33 @@ type httpResponse struct { ShowTokens bool `json:"-"` } -func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { +type ringAccess interface { + casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error + getRing(context.Context) (*Desc, error) +} + +type ringPageHandler struct { + r ringAccess + heartbeatPeriod time.Duration +} + +func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler { + return &ringPageHandler{ + r: r, + heartbeatPeriod: heartbeatPeriod, + } +} + +func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { ingesterID := req.FormValue("forget") - if err := r.forget(req.Context(), ingesterID); err != nil { - level.Error(r.logger).Log("msg", "error forgetting instance", "err", err) + if err := h.forget(req.Context(), ingesterID); err != nil { + http.Error( + w, + fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(), + http.StatusInternalServerError, + ) + return } // Implement PRG pattern to prevent double-POST and work with CSRF middleware. @@ -140,23 +147,26 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - r.mtx.RLock() - defer r.mtx.RUnlock() + ringDesc, err := h.r.getRing(req.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, ownedTokens := ringDesc.countTokens() ingesterIDs := []string{} - for id := range r.ringDesc.Ingesters { + for id := range ringDesc.Ingesters { ingesterIDs = append(ingesterIDs, id) } sort.Strings(ingesterIDs) now := time.Now() var ingesters []ingesterDesc - _, owned := r.countTokens() for _, id := range ingesterIDs { - ing := r.ringDesc.Ingesters[id] + ing := ringDesc.Ingesters[id] heartbeatTimestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(&ing, Reporting, now) { + if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { state = unhealthy } @@ -175,7 +185,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { Tokens: ing.Tokens, Zone: ing.Zone, NumTokens: len(ing.Tokens), - Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, + Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, }) } @@ -203,6 +213,19 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ } } +func (h *ringPageHandler) forget(ctx context.Context, id string) error { + unregister := func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*Desc) + ringDesc.RemoveIngester(id) + return ringDesc, true, nil + } + return h.r.casRing(ctx, unregister) +} + // WriteJSONResponse writes some JSON as a HTTP response. func writeJSONResponse(w http.ResponseWriter, v httpResponse) { w.Header().Set("Content-Type", "application/json") diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index be103e1fbad7..92ad34608f46 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net/http" "os" "sort" "sync" @@ -26,17 +27,20 @@ type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control - NumTokens int `yaml:"num_tokens"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - ObservePeriod time.Duration `yaml:"observe_period"` - JoinAfter time.Duration `yaml:"join_after"` - MinReadyDuration time.Duration `yaml:"min_ready_duration"` - InfNames []string `yaml:"interface_names"` - FinalSleep time.Duration `yaml:"final_sleep"` + NumTokens int `yaml:"num_tokens" category:"advanced"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` + ObservePeriod time.Duration `yaml:"observe_period" category:"advanced"` + JoinAfter time.Duration `yaml:"join_after" category:"advanced"` + MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` + InfNames []string `yaml:"interface_names"` + + // FinalSleep's default value can be overridden by + // setting it before calling RegisterFlags or RegisterFlagsWithPrefix. + FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` - UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` - ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health"` + UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"` + ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` @@ -47,12 +51,14 @@ type LifecyclerConfig struct { ListenPort int `yaml:"-"` } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. +// The default values of some flags can be changed; see docs of LifecyclerConfig. func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. +// The default values of some flags can be changed; see docs of LifecyclerConfig. func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f) @@ -67,7 +73,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive ring updates.") - f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") + f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", cfg.FinalSleep, "Duration to sleep for before exiting, to ensure metrics are scraped.") f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") hostname, err := os.Hostname() @@ -849,6 +855,23 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { time.Sleep(i.cfg.FinalSleep) } +func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return i.KVStore.CAS(ctx, i.RingKey, f) +} + +func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := i.KVStore.Get(ctx, i.RingKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req) +} + // unregister removes our entry from consul. func (i *Lifecycler) unregister(ctx context.Context) error { level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName) diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index 461429d6fa87..b73227136d30 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -20,8 +20,9 @@ type ReplicationSet struct { MaxUnavailableZones int } -// Do function f in parallel for all replicas in the set, erroring is we exceed +// Do function f in parallel for all replicas in the set, erroring if we exceed // MaxErrors and returning early otherwise. +// Return a slice of all results from f, or nil if an error occurred. func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { type instanceResult struct { res interface{} diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6aaf165bf970..5553c6b72121 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -8,11 +8,13 @@ import ( "fmt" "math" "math/rand" + "net/http" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -513,27 +515,32 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro } // countTokens returns the number of tokens and tokens within the range for each instance. -// The ring read lock must be already taken when calling this function. -func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { - owned := map[string]uint32{} - numTokens := map[string]uint32{} - for i, token := range r.ringTokens { +func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { + var ( + owned = map[string]uint32{} + numTokens = map[string]uint32{} + + ringTokens = r.GetTokens() + ringInstanceByToken = r.getTokensInfo() + ) + + for i, token := range ringTokens { var diff uint32 // Compute how many tokens are within the range. - if i+1 == len(r.ringTokens) { - diff = (math.MaxUint32 - token) + r.ringTokens[0] + if i+1 == len(ringTokens) { + diff = (math.MaxUint32 - token) + ringTokens[0] } else { - diff = r.ringTokens[i+1] - token + diff = ringTokens[i+1] - token } - info := r.ringInstanceByToken[token] + info := ringInstanceByToken[token] numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } // Set to 0 the number of owned tokens by instances which don't have tokens yet. - for id := range r.ringDesc.Ingesters { + for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 numTokens[id] = 0 @@ -582,7 +589,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { prevOwners := r.reportedOwners r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.countTokens() + numTokens, ownedRange := r.ringDesc.countTokens() for id, totalOwned := range ownedRange { r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) @@ -840,6 +847,23 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) { } } +func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return r.KVClient.CAS(ctx, r.key, f) +} + +func (r *Ring) getRing(ctx context.Context) (*Desc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + ringDesc := proto.Clone(r.ringDesc).(*Desc) + + return ringDesc, nil +} + +func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req) +} + // Operation describes which instances can be included in the replica set, based on their state. // // Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states. diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go index e5da50bc7a32..a7f29ab8cd88 100644 --- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -26,7 +26,7 @@ type Loader func(r io.Reader) (interface{}, error) // Config holds the config for an Manager instance. // It holds config related to loading per-tenant config. type Config struct { - ReloadPeriod time.Duration `yaml:"period"` + ReloadPeriod time.Duration `yaml:"period" category:"advanced"` // LoadPath contains the path to the runtime config file, requires an // non-empty value LoadPath string `yaml:"file"` diff --git a/vendor/modules.txt b/vendor/modules.txt index 64eea73ee107..27a33fb12261 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -574,7 +574,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 +# github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 ## explicit; go 1.16 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency From ae1e564cfd7d314b7f7306a07a22f8bede7debb5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 08:57:51 +0100 Subject: [PATCH 08/16] Intrument Loki with the package --- pkg/distributor/distributor.go | 7 ++- pkg/ingester/flush.go | 17 +++++- pkg/ingester/ingester.go | 27 ++++++--- pkg/ingester/instance.go | 5 ++ pkg/ingester/stream.go | 5 ++ pkg/loghttp/push/push.go | 6 ++ pkg/logql/metrics.go | 18 ++++++ pkg/storage/store.go | 8 +++ .../stores/shipper/compactor/compactor.go | 11 ++++ pkg/usagestats/reporter.go | 4 ++ pkg/usagestats/stats.go | 58 ++++++++++++++----- 11 files changed, 141 insertions(+), 25 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2b65da90bf4e..563377324bbe 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" @@ -37,7 +38,10 @@ const ( ringKey = "distributor" ) -var maxLabelCacheSize = 100000 +var ( + maxLabelCacheSize = 100000 + rfStats = usagestats.NewInt("distributor_replication_factor") +) // Config for a Distributor. type Config struct { @@ -168,6 +172,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in }), } d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) + rfStats.Set(int64(ingestersRing.ReplicationFactor())) servs = append(servs, d.pool) d.subservices, err = services.NewManager(servs...) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index b1b8dab657fc..f0e7f00c31c5 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -90,6 +91,12 @@ var ( // 1h -> 8hr Buckets: prometheus.LinearBuckets(1, 1, 8), }) + flushedChunksStats = usagestats.NewCounter("ingester_flushed_chunks") + flushedChunksBytesStats = usagestats.NewStatistics("ingester_flushed_chunks_bytes") + flushedChunksLinesStats = usagestats.NewStatistics("ingester_flushed_chunks_lines") + flushedChunksAgeStats = usagestats.NewStatistics("ingester_flushed_chunks_age_seconds") + flushedChunksLifespanStats = usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds") + flushedChunksUtilizationStats = usagestats.NewStatistics("ingester_flushed_chunks_utilization") ) const ( @@ -382,6 +389,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP if err := i.store.Put(ctx, wireChunks); err != nil { return err } + flushedChunksStats.Inc(int64(len(wireChunks))) // Record statistics only when actual put request did not return error. sizePerTenant := chunkSizePerTenant.WithLabelValues(userID) @@ -408,7 +416,8 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) } - chunkUtilization.Observe(wc.Data.Utilization()) + utilization := wc.Data.Utilization() + chunkUtilization.Observe(utilization) chunkEntries.Observe(float64(numEntries)) chunkSize.Observe(compressedSize) sizePerTenant.Add(compressedSize) @@ -416,6 +425,12 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP firstTime, lastTime := cs[i].chunk.Bounds() chunkAge.Observe(time.Since(firstTime).Seconds()) chunkLifespan.Observe(lastTime.Sub(firstTime).Hours()) + + flushedChunksBytesStats.Record(compressedSize) + flushedChunksLinesStats.Record(float64(numEntries)) + flushedChunksUtilizationStats.Record(utilization) + flushedChunksAgeStats.Record(time.Since(firstTime).Seconds()) + flushedChunksLifespanStats.Record(lastTime.Sub(firstTime).Hours()) } return nil diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index def6e16f7421..ea58a40fc61a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" errUtil "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -45,12 +46,18 @@ const ( // ErrReadOnly is returned when the ingester is shutting down and a push was // attempted. -var ErrReadOnly = errors.New("Ingester is shutting down") - -var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_ingester_flush_queue_length", - Help: "The total number of series pending in the flush queue.", -}) +var ( + ErrReadOnly = errors.New("Ingester is shutting down") + + flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_flush_queue_length", + Help: "The total number of series pending in the flush queue.", + }) + compressionStats = usagestats.NewString("ingester_compression") + targetSizeStats = usagestats.NewInt("ingester_target_size_bytes") + walStats = usagestats.NewString("ingester_wal") + activeTenantsStats = usagestats.NewInt("ingester_active_tenants") +) // Config for an ingester. type Config struct { @@ -212,7 +219,12 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } - + compressionStats.Set(cfg.ChunkEncoding) + targetSizeStats.Set(int64(cfg.TargetChunkSize)) + walStats.Set("disabled") + if cfg.WAL.Enabled { + walStats.Set("enabled") + } metrics := newIngesterMetrics(registerer) i := &Ingester{ @@ -546,6 +558,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) *instance { if !ok { inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter) i.instances[instanceID] = inst + activeTenantsStats.Set(int64(len(i.instances))) } return inst } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 6bf66c347217..bf60bc935114 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" @@ -53,6 +54,8 @@ var ( Name: "ingester_streams_removed_total", Help: "The total number of streams removed per tenant.", }, []string{"tenant"}) + + streamsCountStats = usagestats.NewInt("ingester_streams_count") ) type instance struct { @@ -248,6 +251,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord memoryStreams.WithLabelValues(i.instanceID).Inc() i.streamsCreatedTotal.Inc() i.addTailersToNewStream(s) + streamsCountStats.Add(1) if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( @@ -288,6 +292,7 @@ func (i *instance) removeStream(s *stream) { i.index.Delete(s.labels, s.fp) i.streamsRemovedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Dec() + streamsCountStats.Add(-1) } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 87f1d13bfdb2..4ddef5f04b7c 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/flagext" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" @@ -47,6 +48,8 @@ var ( Buckets: prometheus.ExponentialBuckets(5, 2, 6), }) + + chunkCreatedStats = usagestats.NewCounter("ingester_chunk_created") ) var ErrEntriesExist = errors.New("duplicate push - entries already exist") @@ -203,6 +206,7 @@ func (s *stream) Push( chunk: s.NewChunk(), }) chunksCreatedTotal.Inc() + chunkCreatedStats.Inc(1) } var storedEntries []logproto.Entry @@ -379,6 +383,7 @@ func (s *stream) cutChunk(ctx context.Context) *chunkDesc { samplesPerChunk.Observe(float64(chunk.chunk.Size())) blocksPerChunk.Observe(float64(chunk.chunk.BlockCount())) chunksCreatedTotal.Inc() + chunkCreatedStats.Inc(1) s.chunks = append(s.chunks, chunkDesc{ chunk: s.NewChunk(), diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 486042ecedea..9ee8881f5f2e 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/unmarshal" @@ -39,6 +40,9 @@ var ( Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", }, []string{"tenant"}) + + bytesReceivedStats = usagestats.NewCounter("distributor_bytes_received") + linesReceivedStats = usagestats.NewCounter("distributor_lines_received") ) const applicationJSON = "application/json" @@ -130,6 +134,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete totalEntries++ entriesSize += int64(len(e.Line)) bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line)))) + bytesReceivedStats.Inc(int64(len(e.Line))) if e.Timestamp.After(mostRecentEntry) { mostRecentEntry = e.Timestamp } @@ -140,6 +145,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete if totalEntries != 0 && userID != "" { linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) } + linesReceivedStats.Inc(totalEntries) level.Debug(logger).Log( "msg", "push request parsed", diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 56b82b5091a8..9359b56da515 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -64,6 +65,11 @@ var ( Name: "logql_querystats_ingester_sent_lines_total", Help: "Total count of lines sent from ingesters while executing LogQL queries.", }) + + bytePerSecondMetricUsage = usagestats.NewStatistics("query_metric_bytes_per_second") + bytePerSecondLogUsage = usagestats.NewStatistics("query_log_bytes_per_second") + linePerSecondMetricUsage = usagestats.NewStatistics("query_metric_lines_per_second") + linePerSecondLogUsage = usagestats.NewStatistics("query_log_lines_per_second") ) func RecordMetrics(ctx context.Context, p Params, status string, stats logql_stats.Result, result promql_parser.Value) { @@ -125,6 +131,18 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats logql_sta chunkDownloadedTotal.WithLabelValues(status, queryType, rt). Add(float64(stats.TotalChunksDownloaded())) ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) + + recordUsageStats(queryType, stats) +} + +func recordUsageStats(queryType string, stats logql_stats.Result) { + if queryType == QueryTypeMetric { + bytePerSecondMetricUsage.Record(float64(stats.Summary.BytesProcessedPerSecond)) + linePerSecondMetricUsage.Record(float64(stats.Summary.LinesProcessedPerSecond)) + } else { + bytePerSecondLogUsage.Record(float64(stats.Summary.BytesProcessedPerSecond)) + linePerSecondLogUsage.Record(float64(stats.Summary.LinesProcessedPerSecond)) + } } func QueryType(query string) (string, error) { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 8a65211fa882..f02792037908 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" ) @@ -31,6 +32,9 @@ var ( errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period") errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h") errZeroLengthConfig = errors.New("must specify at least one schema configuration") + indexTypeStats = usagestats.NewString("store_index_type") + objectTypeStats = usagestats.NewString("store_object_type") + schemaStats = usagestats.NewString("store_schema") ) // Config is the loki storage configuration @@ -125,6 +129,10 @@ type store struct { // NewStore creates a new Loki Store using configuration supplied. func NewStore(cfg Config, schemaCfg SchemaConfig, chunkStore chunk.Store, registerer prometheus.Registerer) (Store, error) { + index := ActivePeriodConfig(schemaCfg.Configs) + indexTypeStats.Set(schemaCfg.Configs[index].IndexType) + objectTypeStats.Set(schemaCfg.Configs[index].ObjectType) + schemaStats.Set(schemaCfg.Configs[index].Schema) return &store{ Store: chunkStore, cfg: cfg, diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 5f9abefced04..dc2a5f39a47d 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -52,6 +53,11 @@ const ( ringNumTokens = 1 ) +var ( + retentionEnabledStats = usagestats.NewString("compactor_retention_enabled") + defaultRetentionStats = usagestats.NewString("compactor_default_retention") +) + type Config struct { WorkingDirectory string `yaml:"working_directory"` SharedStoreType string `yaml:"shared_store"` @@ -119,6 +125,11 @@ type Compactor struct { } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, clientMetrics storage.ClientMetrics, r prometheus.Registerer) (*Compactor, error) { + retentionEnabledStats.Set("false") + if cfg.RetentionEnabled { + retentionEnabledStats.Set("true") + } + defaultRetentionStats.Set(limits.DefaultLimits().RetentionPeriod.String()) if cfg.SharedStoreType == "" { return nil, errors.New("compactor shared_store_type must be specified") } diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 96d70256ff73..de3675deba4b 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -169,6 +169,7 @@ func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) b return nil, backoff.Err() } +// readSeedFile reads the cluster seed file from the object store. func (rep *Reporter) readSeedFile(ctx context.Context) (*ClusterSeed, error) { reader, _, err := rep.objectClient.GetObject(ctx, ClusterSeedFileName) if err != nil { @@ -193,6 +194,7 @@ func (rep *Reporter) readSeedFile(ctx context.Context) (*ClusterSeed, error) { return seed.(*ClusterSeed), nil } +// writeSeedFile writes the cluster seed to the object store. func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error { data, err := JSONCodec.Encode(seed) if err != nil { @@ -201,6 +203,7 @@ func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error return rep.objectClient.PutObject(ctx, ClusterSeedFileName, bytes.NewReader(data)) } +// running inits the reporter seed and start sending report for every interval func (rep *Reporter) running(ctx context.Context) error { rep.init(ctx) @@ -234,6 +237,7 @@ func (rep *Reporter) running(ctx context.Context) error { } } +// reportUsage reports the usage to grafana.com. func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error { backoff := backoff.New(ctx, backoff.Config{ MinBackoff: time.Second, diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go index 62cede44a172..89218f6f6f50 100644 --- a/pkg/usagestats/stats.go +++ b/pkg/usagestats/stats.go @@ -30,6 +30,7 @@ var ( editionKey = "edition" ) +// Report is the JSON object sent to the stats server type Report struct { ClusterID string `json:"clusterID"` CreatedAt time.Time `json:"createdAt"` @@ -42,8 +43,10 @@ type Report struct { Metrics map[string]interface{} `json:"metrics"` } +// sendReport sends the report to the stats server func sendReport(ctx context.Context, seed *ClusterSeed, interval time.Time) error { - out, err := jsoniter.MarshalIndent(buildReport(seed, interval), "", " ") + report := buildReport(seed, interval) + out, err := jsoniter.MarshalIndent(report, "", " ") if err != nil { return err } @@ -67,6 +70,7 @@ func sendReport(ctx context.Context, seed *ClusterSeed, interval time.Time) erro return nil } +// buildReport builds the report to be sent to the stats server func buildReport(seed *ClusterSeed, interval time.Time) Report { var ( targetName string @@ -96,6 +100,7 @@ func buildReport(seed *ClusterSeed, interval time.Time) Report { } } +// buildMetrics builds the metrics part of the report to be sent to the stats server func buildMetrics() map[string]interface{} { result := map[string]interface{}{ "memstats": memstats(), @@ -137,39 +142,36 @@ func memstats() interface{} { "alloc": stats.Alloc, "total_alloc": stats.TotalAlloc, "sys": stats.Sys, - "mallocs": stats.Mallocs, - "frees": stats.Frees, "heap_alloc": stats.HeapAlloc, - "heap_sys": stats.HeapSys, - "heap_idle": stats.HeapIdle, "heap_inuse": stats.HeapInuse, - "heap_released": stats.HeapReleased, - "heap_objects": stats.HeapObjects, "stack_inuse": stats.StackInuse, - "stack_sys": stats.StackSys, - "other_sys": stats.OtherSys, "pause_total_ns": stats.PauseTotalNs, "num_gc": stats.NumGC, "gc_cpu_fraction": stats.GCCPUFraction, } } +// NewFloat returns a new Float stats object. func NewFloat(name string) *expvar.Float { return expvar.NewFloat(statsPrefix + name) } +// NewInt returns a new Int stats object. func NewInt(name string) *expvar.Int { return expvar.NewInt(statsPrefix + name) } +// NewString returns a new String stats object. func NewString(name string) *expvar.String { return expvar.NewString(statsPrefix + name) } +// Target sets the target name. func Target(target string) { NewString(targetKey).Set(target) } +// Edition sets the edition name. func Edition(edition string) { NewString(editionKey).Set(edition) } @@ -186,6 +188,15 @@ type Statistics struct { value *atomic.Float64 } +// NewStatistics returns a new Statistics object. +// Statistics object is thread-safe and compute statistics on the fly based on sample recorded. +// Available statistics are: +// - min +// - max +// - avg +// - count +// - stddev +// - stdvar func NewStatistics(name string) *Statistics { s := &Statistics{ min: atomic.NewFloat64(math.Inf(0)), @@ -206,14 +217,26 @@ func (s *Statistics) String() string { func (s *Statistics) Value() map[string]interface{} { stdvar := s.value.Load() / float64(s.count.Load()) - return map[string]interface{}{ - "min": s.min.Load(), - "max": s.max.Load(), - "avg": s.avg.Load(), - "count": s.count.Load(), - "stddev": math.Sqrt(stdvar), - "stdvar": stdvar, + stddev := math.Sqrt(stdvar) + min := s.min.Load() + max := s.max.Load() + result := map[string]interface{}{ + "avg": s.avg.Load(), + "count": s.count.Load(), + } + if !math.IsInf(min, 0) { + result["min"] = min + } + if !math.IsInf(max, 0) { + result["max"] = s.max.Load() } + if !math.IsNaN(stddev) { + result["stddev"] = stddev + } + if !math.IsNaN(stdvar) { + result["stdvar"] = stdvar + } + return result } func (s *Statistics) Record(v float64) { @@ -259,6 +282,7 @@ type Counter struct { resetTime time.Time } +// NewCounter returns a new Counter stats object. func NewCounter(name string) *Counter { c := &Counter{ total: atomic.NewInt64(0), @@ -301,6 +325,8 @@ type WordCounter struct { count *atomic.Int64 } +// NewWordCounter returns a new WordCounter stats object. +// WordCounter object is thread-safe and count the amount of word recorded. func NewWordCounter(name string) *WordCounter { c := &WordCounter{ count: atomic.NewInt64(0), From 9eab96b494ab5c8081fb7533f1d9e8711558c18f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 09:34:05 +0100 Subject: [PATCH 09/16] Add changelog entry Signed-off-by: Cyril Tovena --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 109115168ae7..773f54a99c91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5361](https://github.com/grafana/loki/pull/5361) **ctovena**: Add usage report to grafana.com. * [5289](https://github.com/grafana/loki/pull/5289) **ctovena**: Fix deduplication bug in queries when mutating labels. * [5302](https://github.com/grafana/loki/pull/5302) **MasslessParticle** Update azure blobstore client to use new sdk. * [5243](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update Promtail to support duration string formats. From 6fe98eac812aebe1cd4903af3dda72159a128d4a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 10:20:17 +0100 Subject: [PATCH 10/16] Fixes compactor test Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/compactor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index d139f3ffcbb2..bbfeb044bf37 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -129,7 +129,9 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st if cfg.RetentionEnabled { retentionEnabledStats.Set("true") } - defaultRetentionStats.Set(limits.DefaultLimits().RetentionPeriod.String()) + if limits != nil { + defaultRetentionStats.Set(limits.DefaultLimits().RetentionPeriod.String()) + } if cfg.SharedStoreType == "" { return nil, errors.New("compactor shared_store_type must be specified") } From ff3a134ca53da91d49bf066e2d4dbd6041f46453 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 10:32:32 +0100 Subject: [PATCH 11/16] Add configuration documentation Signed-off-by: Cyril Tovena --- docs/sources/configuration/_index.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index c12e60bb4c2c..1db281a9a284 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -164,6 +164,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set # If a more specific configuration is given in other sections, # the related configuration within this section will be ignored. [common: ] + +# Configuration for usage report +[usage_report: ] ``` ## server @@ -2496,6 +2499,16 @@ This way, one doesn't have to replicate configuration in multiple places. [ring: ] ``` +## usage_report + +This block allow to configure usage report of Loki to grafana.com + +```yaml +# Whether or not usage report should be disabled. +# CLI flag: -usage-report.disabled +[disabled: : default = false] +``` + ### storage The common `storage` block defines a common storage to be reused by different From 1deadc0e8fed4de9f777e1ac37eab2bc61870bde Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 10:36:07 +0100 Subject: [PATCH 12/16] Update pkg/usagestats/reporter.go Co-authored-by: Danny Kopping --- pkg/usagestats/reporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index de3675deba4b..591a21772fe2 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -42,7 +42,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.BoolVar(&cfg.Disabled, "usage-report.disabled", false, "Allow to disable usage reporting.") + f.BoolVar(&cfg.Disabled, "usage-report.disabled", false, "Disable anonymous usage reporting.") } type Reporter struct { From e1fc2f4667232b5b7359faa560d7f9545553fc0c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 11:22:28 +0100 Subject: [PATCH 13/16] Add boundary check Signed-off-by: Cyril Tovena --- pkg/storage/store.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index f02792037908..1f5ec9fac759 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -129,10 +129,14 @@ type store struct { // NewStore creates a new Loki Store using configuration supplied. func NewStore(cfg Config, schemaCfg SchemaConfig, chunkStore chunk.Store, registerer prometheus.Registerer) (Store, error) { - index := ActivePeriodConfig(schemaCfg.Configs) - indexTypeStats.Set(schemaCfg.Configs[index].IndexType) - objectTypeStats.Set(schemaCfg.Configs[index].ObjectType) - schemaStats.Set(schemaCfg.Configs[index].Schema) + if len(schemaCfg.Configs) != 0 { + if index := ActivePeriodConfig(schemaCfg.Configs); index != -1 && index < len(schemaCfg.Configs) { + indexTypeStats.Set(schemaCfg.Configs[index].IndexType) + objectTypeStats.Set(schemaCfg.Configs[index].ObjectType) + schemaStats.Set(schemaCfg.Configs[index].Schema) + } + } + return &store{ Store: chunkStore, cfg: cfg, From 4eef924694ef5bddd69580b2f022edc5e21d9867 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 11:29:35 +0100 Subject: [PATCH 14/16] Add log for success report. Signed-off-by: Cyril Tovena --- pkg/usagestats/reporter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 591a21772fe2..0d0508b05856 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -252,6 +252,7 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error backoff.Wait() continue } + level.Debug(rep.logger).Log("msg", "usage report send with success") return nil } return errs.Err() From 80d799bc86e85fe0d2bbeeaee7069ec698f6cb97 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 12:17:54 +0100 Subject: [PATCH 15/16] lint Signed-off-by: Cyril Tovena --- .../pkg/promtail/targets/cloudflare/target.go | 10 +-- .../targets/cloudflare/target_test.go | 26 ++++---- pkg/ruler/base/ruler.go | 8 +-- pkg/storage/stores/shipper/compactor/table.go | 11 +--- .../stores/shipper/downloads/index_set.go | 10 +-- pkg/storage/stores/shipper/downloads/table.go | 4 +- pkg/util/server/error.go | 2 +- pkg/util/server/error_test.go | 62 ++++++++++++------- 8 files changed, 71 insertions(+), 62 deletions(-) diff --git a/clients/pkg/promtail/targets/cloudflare/target.go b/clients/pkg/promtail/targets/cloudflare/target.go index 806eefabfdc1..ad097af45dff 100644 --- a/clients/pkg/promtail/targets/cloudflare/target.go +++ b/clients/pkg/promtail/targets/cloudflare/target.go @@ -109,11 +109,11 @@ func (t *Target) start() { end = maxEnd } start := end.Add(-time.Duration(t.config.PullRange)) - + requests := splitRequests(start, end, t.config.Workers) // Use background context for workers as we don't want to cancel half way through. // In case of errors we stop the target, each worker has it's own retry logic. - if err := concurrency.ForEach(context.Background(), splitRequests(start, end, t.config.Workers), t.config.Workers, func(ctx context.Context, job interface{}) error { - request := job.(pullRequest) + if err := concurrency.ForEachJob(context.Background(), len(requests), t.config.Workers, func(ctx context.Context, idx int) error { + request := requests[idx] return t.pull(ctx, request.start, request.end) }); err != nil { level.Error(t.logger).Log("msg", "failed to pull logs", "err", err, "start", start, "end", end) @@ -229,9 +229,9 @@ type pullRequest struct { end time.Time } -func splitRequests(start, end time.Time, workers int) []interface{} { +func splitRequests(start, end time.Time, workers int) []pullRequest { perWorker := end.Sub(start) / time.Duration(workers) - var requests []interface{} + var requests []pullRequest for i := 0; i < workers; i++ { r := pullRequest{ start: start.Add(time.Duration(i) * perWorker), diff --git a/clients/pkg/promtail/targets/cloudflare/target_test.go b/clients/pkg/promtail/targets/cloudflare/target_test.go index 770b1be714e1..a06c101a99cc 100644 --- a/clients/pkg/promtail/targets/cloudflare/target_test.go +++ b/clients/pkg/promtail/targets/cloudflare/target_test.go @@ -251,26 +251,26 @@ func Test_splitRequests(t *testing.T) { tests := []struct { start time.Time end time.Time - want []interface{} + want []pullRequest }{ // perfectly divisible { time.Unix(0, 0), time.Unix(0, int64(time.Minute)), - []interface{}{ - pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))}, + []pullRequest{ + {start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, + {start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, + {start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))}, }, }, // not divisible { time.Unix(0, 0), time.Unix(0, int64(time.Minute+1)), - []interface{}{ - pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))}, + []pullRequest{ + {start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, + {start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, + {start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))}, }, }, } @@ -279,11 +279,11 @@ func Test_splitRequests(t *testing.T) { got := splitRequests(tt.start, tt.end, 3) if !assert.Equal(t, tt.want, got) { for i := range got { - if !assert.Equal(t, tt.want[i].(pullRequest).start, got[i].(pullRequest).start) { - t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].(pullRequest).start.UnixNano(), got[i].(pullRequest).start.UnixNano()) + if !assert.Equal(t, tt.want[i].start, got[i].start) { + t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].start.UnixNano(), got[i].start.UnixNano()) } - if !assert.Equal(t, tt.want[i].(pullRequest).end, got[i].(pullRequest).end) { - t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].(pullRequest).end.UnixNano(), got[i].(pullRequest).end.UnixNano()) + if !assert.Equal(t, tt.want[i].end, got[i].end) { + t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].end.UnixNano(), got[i].end.UnixNano()) } } } diff --git a/pkg/ruler/base/ruler.go b/pkg/ruler/base/ruler.go index 46fca61517d4..cf8761d6f45e 100644 --- a/pkg/ruler/base/ruler.go +++ b/pkg/ruler/base/ruler.go @@ -427,7 +427,7 @@ func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if r.cfg.EnableSharding { r.ring.ServeHTTP(w, req) } else { - var unshardedPage = ` + unshardedPage := ` @@ -769,9 +769,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta // Concurrently fetch rules from all rulers. Since rules are not replicated, // we need all requests to succeed. - jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) - err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { - addr := job.(string) + addresses := rulers.GetAddresses() + err = concurrency.ForEachJob(ctx, len(addresses), len(addresses), func(ctx context.Context, idx int) error { + addr := addresses[idx] rulerClient, err := r.clientsPool.GetClientFor(addr) if err != nil { diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index b9ab02611c6d..c1c5434725ac 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -264,18 +264,13 @@ func (t *table) compactFiles(files []storage.IndexFile) error { return err } - jobs := make([]interface{}, len(files)) - for i := 0; i < len(files); i++ { - jobs[i] = i - } - - return concurrency.ForEach(t.ctx, jobs, readDBsConcurrency, func(ctx context.Context, job interface{}) error { - workNum := job.(int) + return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error { + workNum := idx // skip seed file if workNum == t.seedSourceFileIdx { return nil } - fileName := files[workNum].Name + fileName := files[idx].Name downloadAt := filepath.Join(t.workingDirectory, fileName) err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(fileName), diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 9672f2eb48fa..3c9d0f8c32b3 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -390,13 +390,8 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind downloadedFiles := make([]string, 0, len(files)) downloadedFilesMtx := sync.Mutex{} - jobs := make([]interface{}, len(files)) - for i := 0; i < len(files); i++ { - jobs[i] = i - } - - err := concurrency.ForEach(ctx, jobs, maxDownloadConcurrency, func(ctx context.Context, job interface{}) error { - fileName := files[job.(int)].Name + err := concurrency.ForEachJob(ctx, len(files), maxDownloadConcurrency, func(ctx context.Context, idx int) error { + fileName := files[idx].Name err := t.downloadFileFromStorage(ctx, fileName, t.cacheLocation) if err != nil { if t.baseIndexSet.IsFileNotFoundErr(err) { @@ -412,7 +407,6 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind return nil }) - if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index ec334eb63aa4..ca427a633c9b 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -308,8 +308,8 @@ func (t *Table) EnsureQueryReadiness(ctx context.Context) error { // downloadUserIndexes downloads user specific index files concurrently. func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error { - return concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), maxDownloadConcurrency, func(ctx context.Context, userID interface{}) error { - indexSet, err := t.getOrCreateIndexSet(userID.(string)) + return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error { + indexSet, err := t.getOrCreateIndexSet(userIDs[idx]) if err != nil { return err } diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index d2598e0d27c2..81b59a5e4eeb 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -41,7 +41,7 @@ func NotFoundHandler(w http.ResponseWriter, r *http.Request) { func JSONError(w http.ResponseWriter, code int, message string, args ...interface{}) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(code) - json.NewEncoder(w).Encode(ErrorResponseBody{ + _ = json.NewEncoder(w).Encode(ErrorResponseBody{ Code: code, Status: "error", Message: fmt.Sprintf(message, args...), diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index 211ae1380374..8e9e48cc022b 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -32,58 +32,78 @@ func Test_writeError(t *testing.T) { }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, {"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, - {"rpc cancelled", + { + "rpc cancelled", status.New(codes.Canceled, context.Canceled.Error()).Err(), "rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"rpc cancelled multi", + http.StatusInternalServerError, + }, + { + "rpc cancelled multi", util.MultiError{status.New(codes.Canceled, context.Canceled.Error()).Err(), status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"mixed context and rpc cancelled", + http.StatusInternalServerError, + }, + { + "mixed context and rpc cancelled", util.MultiError{context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"mixed context, rpc cancelled and another", + http.StatusInternalServerError, + }, + { + "mixed context, rpc cancelled and another", util.MultiError{errors.New("standard error"), context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "3 errors: standard error; context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, {"cancelled storage", promql.ErrStorage{Err: context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, {"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"deadline multi", util.MultiError{context.DeadlineExceeded, context.DeadlineExceeded}, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"rpc deadline", status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), ErrDeadlineExceeded, http.StatusGatewayTimeout}, - {"rpc deadline multi", - util.MultiError{status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), - status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, + { + "rpc deadline multi", + util.MultiError{ + status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), + status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), + }, ErrDeadlineExceeded, - http.StatusGatewayTimeout}, - {"mixed context and rpc deadline", + http.StatusGatewayTimeout, + }, + { + "mixed context and rpc deadline", util.MultiError{context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, ErrDeadlineExceeded, - http.StatusGatewayTimeout}, - {"mixed context, rpc deadline and another", + http.StatusGatewayTimeout, + }, + { + "mixed context, rpc deadline and another", util.MultiError{errors.New("standard error"), context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, "3 errors: standard error; context deadline exceeded; rpc error: code = DeadlineExceeded desc = context deadline exceeded", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, {"parse error", logqlmodel.ParseError{}, "parse error : ", http.StatusBadRequest}, {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest}, {"internal", errors.New("foo"), "foo", http.StatusInternalServerError}, {"query error", chunk.ErrQueryMustContainMetricName, chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest}, - {"wrapped query error", + { + "wrapped query error", fmt.Errorf("wrapped: %w", chunk.ErrQueryMustContainMetricName), "wrapped: " + chunk.ErrQueryMustContainMetricName.Error(), - http.StatusBadRequest}, - {"multi mixed", + http.StatusBadRequest, + }, + { + "multi mixed", util.MultiError{context.Canceled, context.DeadlineExceeded}, "2 errors: context canceled; context deadline exceeded", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, } { t.Run(tt.name, func(t *testing.T) { rec := httptest.NewRecorder() WriteError(tt.err, rec) res := &ErrorResponseBody{} - json.NewDecoder(rec.Result().Body).Decode(res) + _ = json.NewDecoder(rec.Result().Body).Decode(res) require.Equal(t, tt.expectedStatus, res.Code) require.Equal(t, tt.expectedStatus, rec.Result().StatusCode) require.Equal(t, tt.expectedMsg, res.Message) From 52a8a56324ce75a527223fb33550a63af63b5064 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 13:28:49 +0100 Subject: [PATCH 16/16] Update pkg/usagestats/reporter.go Co-authored-by: Danny Kopping --- pkg/usagestats/reporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 0d0508b05856..af57c163f863 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -247,12 +247,12 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error var errs multierror.MultiError for backoff.Ongoing() { if err := sendReport(ctx, rep.cluster, interval); err != nil { - level.Info(rep.logger).Log("msg", "failed to send stats", "retries", backoff.NumRetries(), "err", err) + level.Info(rep.logger).Log("msg", "failed to send usage report", "retries", backoff.NumRetries(), "err", err) errs.Add(err) backoff.Wait() continue } - level.Debug(rep.logger).Log("msg", "usage report send with success") + level.Debug(rep.logger).Log("msg", "usage report sent with success") return nil } return errs.Err()