From 6bcd9807d686ffd74d1b738d3a2d9d5749b50878 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 21 Apr 2021 13:59:03 +0200 Subject: [PATCH] replace `time.Duration` -> `model.Duration` for `Limits`. (#3632) Rationale 1. support user-friendly duration format (e.g: "1h30m45s") in JSON value. 2. In consistent with Cortex and tempo. --- pkg/distributor/distributor_test.go | 5 +- pkg/distributor/validator_test.go | 3 +- pkg/loki/modules.go | 2 +- pkg/querier/querier_test.go | 2 +- pkg/storage/store_test.go | 2 +- pkg/validation/limits.go | 81 +++++++++++++-------- pkg/validation/limits_test.go | 109 ++++++++++++++++++++++++++++ 7 files changed, 166 insertions(+), 38 deletions(-) create mode 100644 pkg/validation/limits_test.go diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index fe389f983a29..69e0272fb611 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/test" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -142,8 +143,8 @@ func Benchmark_Push(b *testing.B) { limits.EnforceMetricName = false limits.MaxLineSize = math.MaxInt32 limits.RejectOldSamples = true - limits.RejectOldSamplesMaxAge = 24 * time.Hour - limits.CreationGracePeriod = 24 * time.Hour + limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour) + limits.CreationGracePeriod = model.Duration(24 * time.Hour) ingester := &mockIngester{} d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 9d657adf2a33..21ba92235081 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/weaveworks/common/httpgrpc" @@ -39,7 +40,7 @@ func TestValidator_ValidateEntry(t *testing.T) { func(userID string) *validation.Limits { return &validation.Limits{ RejectOldSamples: true, - RejectOldSamplesMaxAge: 1 * time.Hour, + RejectOldSamplesMaxAge: model.Duration(1 * time.Hour), } }, logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0bfe49822043..241d521e1a7f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -119,7 +119,7 @@ func (t *Loki) initRing() (_ services.Service, err error) { func (t *Loki) initRuntimeConfig() (services.Service, error) { if t.Cfg.RuntimeConfig.LoadPath == "" { t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig - t.Cfg.RuntimeConfig.ReloadPeriod = t.Cfg.LimitsConfig.PerTenantOverridePeriod + t.Cfg.RuntimeConfig.ReloadPeriod = time.Duration(t.Cfg.LimitsConfig.PerTenantOverridePeriod) } if t.Cfg.RuntimeConfig.LoadPath == "" { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c7e66d886f81..05d5d5073995 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -192,7 +192,7 @@ func TestQuerier_validateQueryRequest(t *testing.T) { defaultLimits := defaultLimitsTestConfig() defaultLimits.MaxStreamsMatchersPerQuery = 1 - defaultLimits.MaxQueryLength = 2 * time.Minute + defaultLimits.MaxQueryLength = model.Duration(2 * time.Minute) limits, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index f26dde16f5e4..3187a38005dc 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -184,7 +184,7 @@ func printHeap(b *testing.B, show bool) { func getLocalStore() Store { limits, err := validation.NewOverrides(validation.Limits{ - MaxQueryLength: 6000 * time.Hour, + MaxQueryLength: model.Duration(6000 * time.Hour), }, nil) if err != nil { panic(err) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 2a89b2b34788..78454f58063c 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -5,6 +5,8 @@ import ( "time" "github.com/grafana/loki/pkg/util/flagext" + + "github.com/prometheus/common/model" ) const ( @@ -19,6 +21,8 @@ const ( // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. +// NOTE: we use custom `model.Duration` instead of standard `time.Duration` because, +// to support user-friendly duration format (e.g: "1h30m45s") in JSON value. type Limits struct { // Distributor enforced limits. IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` @@ -28,8 +32,8 @@ type Limits struct { MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` - RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` - CreationGracePeriod time.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` + RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` + CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"` @@ -38,28 +42,28 @@ type Limits struct { MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user" json:"max_global_streams_per_user"` // Querier enforced limits. - MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` - MaxQuerySeries int `yaml:"max_query_series" json:"max_query_series"` - MaxQueryLookback time.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` - MaxQueryLength time.Duration `yaml:"max_query_length" json:"max_query_length"` - MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` - CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` - MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"` - MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` - MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` - MaxCacheFreshness time.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` + MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` + MaxQuerySeries int `yaml:"max_query_series" json:"max_query_series"` + MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` + MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` + MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` + CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` + MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"` + MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` + MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` + MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. - QuerySplitDuration time.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` // Ruler defaults and limits. - RulerEvaluationDelay time.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` - RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"` - RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"` + RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` + RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"` + RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"` // Config for overrides, convenient if it goes here. - PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` - PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"` + PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` + PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -72,8 +76,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Reject old samples.") - f.DurationVar(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", 14*24*time.Hour, "Maximum accepted sample age before rejecting.") - f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") + + _ = l.RejectOldSamplesMaxAge.Set("14d") + f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.") + _ = l.CreationGracePeriod.Set("10m") + f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Per-user entries limit per query") @@ -81,21 +88,31 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") - f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.") + + _ = l.MaxQueryLength.Set("0s") + f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit to length of chunk store queries, 0 to disable.") f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series returned by a metric query. When the limit is reached an error is returned.") - f.DurationVar(&l.MaxQueryLookback, "querier.max-query-lookback", 0, "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") + + _ = l.MaxQueryLookback.Set("0s") + f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests") - f.DurationVar(&l.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") - f.DurationVar(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", 0, "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.") + _ = l.MaxCacheFreshness.Set("1m") + f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") + + _ = l.RulerEvaluationDelay.Set("0s") + f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.") + f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.") f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.") f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") - f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with this to reload the overrides.") + + _ = l.PerTenantOverridePeriod.Set("10s") + f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -186,13 +203,13 @@ func (o *Overrides) RejectOldSamples(userID string) bool { // RejectOldSamplesMaxAge returns the age at which samples should be rejected. func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration { - return o.getOverridesForUser(userID).RejectOldSamplesMaxAge + return time.Duration(o.getOverridesForUser(userID).RejectOldSamplesMaxAge) } // CreationGracePeriod is misnamed, and actually returns how far into the future // we should accept samples. func (o *Overrides) CreationGracePeriod(userID string) time.Duration { - return o.getOverridesForUser(userID).CreationGracePeriod + return time.Duration(o.getOverridesForUser(userID).CreationGracePeriod) } // MaxLocalStreamsPerUser returns the maximum number of streams a user is allowed to store @@ -214,7 +231,7 @@ func (o *Overrides) MaxChunksPerQuery(userID string) int { // MaxQueryLength returns the limit of the length (in time) of a query. func (o *Overrides) MaxQueryLength(userID string) time.Duration { - return o.getOverridesForUser(userID).MaxQueryLength + return time.Duration(o.getOverridesForUser(userID).MaxQueryLength) } // MaxQueryLength returns the limit of the series of metric queries. @@ -245,7 +262,7 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { // QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend. func (o *Overrides) QuerySplitDuration(userID string) time.Duration { - return o.getOverridesForUser(userID).QuerySplitDuration + return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration) } // MaxConcurrentTailRequests returns the limit to number of concurrent tail requests. @@ -264,17 +281,17 @@ func (o *Overrides) MaxEntriesLimitPerQuery(userID string) int { } func (o *Overrides) MaxCacheFreshness(userID string) time.Duration { - return o.getOverridesForUser(userID).MaxCacheFreshness + return time.Duration(o.getOverridesForUser(userID).MaxCacheFreshness) } // MaxQueryLookback returns the max lookback period of queries. func (o *Overrides) MaxQueryLookback(userID string) time.Duration { - return o.getOverridesForUser(userID).MaxQueryLookback + return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback) } // EvaluationDelay returns the rules evaluation delay for a given user. func (o *Overrides) EvaluationDelay(userID string) time.Duration { - return o.getOverridesForUser(userID).RulerEvaluationDelay + return time.Duration(o.getOverridesForUser(userID).RulerEvaluationDelay) } // RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go new file mode 100644 index 000000000000..6a7a3101844c --- /dev/null +++ b/pkg/validation/limits_test.go @@ -0,0 +1,109 @@ +package validation + +import ( + "encoding/json" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestLimitsTagsYamlMatchJson(t *testing.T) { + limits := reflect.TypeOf(Limits{}) + n := limits.NumField() + var mismatch []string + + for i := 0; i < n; i++ { + field := limits.Field(i) + + // Note that we aren't requiring YAML and JSON tags to match, just that + // they either both exist or both don't exist. + hasYAMLTag := field.Tag.Get("yaml") != "" + hasJSONTag := field.Tag.Get("json") != "" + + if hasYAMLTag != hasJSONTag { + mismatch = append(mismatch, field.Name) + } + } + + assert.Empty(t, mismatch, "expected no mismatched JSON and YAML tags") +} + +func TestLimitsYamlMatchJson(t *testing.T) { + inputYAML := ` +ingestion_rate_strategy: "some-strategy" +ingestion_rate_mb: 34 +ingestion_burst_size_mb: 40 +max_label_name_length: 10 +max_label_value_length: 20 +max_label_names_per_series: 30 +reject_old_samples: true +reject_old_samples_max_age: 40s +creation_grace_period: 50s +enforce_metric_name: true +max_line_size: 60 +max_streams_per_user: 70 +max_global_streams_per_user: 80 +max_chunks_per_query: 90 +max_query_series: 100 +max_query_lookback: 110s +max_query_length: 120s +max_query_parallelism: 130 +cardinality_limit: 140 +max_streams_matchers_per_query: 150 +max_concurrent_tail_requests: 160 +max_entries_limit_per_query: 170 +max_cache_freshness_per_query: 180s +split_queries_by_interval: 190s +ruler_evaluation_delay_duration: 200s +ruler_max_rules_per_rule_group: 210 +ruler_max_rule_groups_per_tenant: 220 +per_tenant_override_config: "" +per_tenant_override_period: 230s +` + inputJSON := ` + { + "ingestion_rate_strategy": "some-strategy", + "ingestion_rate_mb": 34, + "ingestion_burst_size_mb": 40, + "max_label_name_length": 10, + "max_label_value_length": 20, + "max_label_names_per_series": 30, + "reject_old_samples": true, + "reject_old_samples_max_age": "40s", + "creation_grace_period": "50s", + "enforce_metric_name": true, + "max_line_size": 60, + "max_streams_per_user": 70, + "max_global_streams_per_user": 80, + "max_chunks_per_query": 90, + "max_query_series": 100, + "max_query_lookback": "110s", + "max_query_length": "120s", + "max_query_parallelism": 130, + "cardinality_limit": 140, + "max_streams_matchers_per_query": 150, + "max_concurrent_tail_requests": 160, + "max_entries_limit_per_query": 170, + "max_cache_freshness_per_query": "180s", + "split_queries_by_interval": "190s", + "ruler_evaluation_delay_duration": "200s", + "ruler_max_rules_per_rule_group": 210, + "ruler_max_rule_groups_per_tenant":220, + "per_tenant_override_config": "", + "per_tenant_override_period": "230s" + } +` + + limitsYAML := Limits{} + err := yaml.Unmarshal([]byte(inputYAML), &limitsYAML) + require.NoError(t, err, "expected to be able to unmarshal from YAML") + + limitsJSON := Limits{} + err = json.Unmarshal([]byte(inputJSON), &limitsJSON) + require.NoError(t, err, "expected to be able to unmarshal from JSON") + + assert.Equal(t, limitsYAML, limitsJSON) +}