Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ingester/stream]: Add a byte stream rate limit. #4191

Merged
merged 9 commits into from
Aug 20, 2021
Merged

[ingester/stream]: Add a byte stream rate limit. #4191

merged 9 commits into from
Aug 20, 2021

Conversation

cstyan
Copy link
Contributor

@cstyan cstyan commented Aug 18, 2021

Adds a limiter to the stream struct.

Signed-off-by: Callum Styan callumstyan@gmail.com

Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outline for this PR is looking good; I've left the first round of feedback.

@@ -140,6 +140,7 @@ type Ingester struct {
cfg Config
clientConfig client.Config
tenantConfigs *runtime.TenantConfigs
limits *validation.Overrides
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overrides are already embedded in the limiter field, which I think we should use for consistency.

}

func (s *localStrategy) Limit(userID string) float64 {
return float64(s.limits.MaxLocalStreamRateMB(userID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should use bytes, not MB

// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(len(entries) - i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use accumulator counters for the discarded samples, similar to how out_of_orders are handled on
line 206-212. This is the hot path and it's nice to call this once per batch rather than once per sample.

rateLimitesBytes += len(entries[j].Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitesBytes))
return 0, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamRateLimitErrorMsg, int(s.limiter.Limit(now, s.tenant)), len(entries)-i, rateLimitesBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't return early here for a few reasons:

  1. We'd never run the tailer code.
  2. We need to ultimately return the correct bytesAdded for partial batches.
  3. We want the metrics to be correct for the whole batch, not just one sample.

Instead, try to incorporate these new failures into failedEntriesWithError.

Comment on lines 112 to 119
_ = l.MaxLocalStreamRateBytes.Set("10000")
f.Var(&l.MaxLocalStreamRateBytes, "ingester.max-stream-rate-bytes", "Maximum bytes per second rate per active stream.")
_ = l.MaxLocalStreamBurstRateBytes.Set("20000")
f.Var(&l.MaxLocalStreamBurstRateBytes, "ingester.max-stream-rate-bytes", "Maximum burst bytes per second rate per active stream.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mimic how we set similar configs:
https://github.com/grafana/loki/blob/main/pkg/ingester/wal.go#L24-L50

Notably:

  1. Use a readable const default ${VARIABLE} with appropriate comment if applicable
  2. Default to a powers of 2 form.

I'd like us to default to the following

const defaultPerStreamRateLimit = 1<<20 // 1MB
const defaultPerStreamBurstLimit = 2 * defaultPerStreamRateLimit

Comment on lines 415 to 426
func (o *Overrides) MaxLocalStreamRateMB(userID string) int {
return o.getOverridesForUser(userID).MaxLocalStreamRateBytes.Val()
}

func (o *Overrides) MaxLocalStreamBurstRateMB(userID string) int {
return int(o.getOverridesForUser(userID).MaxLocalStreamBurstRateBytes)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these calculated in different ways (int(x) vs x.Val())? Let's consistently use .Val().

Signed-off-by: Callum Styan <callumstyan@gmail.com>
config value.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
//StreamRateLimit is a reason for discarding lines when the streams own rate limit is hit
// rather than the overall ingestion rate limit.
StreamRateLimit = "per_stream_rate_limt"
StreamRateLimitErrorMsg = "Per stream rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, consider splitting a stream via additional labels or contact your Loki administrator to see if the limt can be increased"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
StreamRateLimitErrorMsg = "Per stream rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, consider splitting a stream via additional labels or contact your Loki administrator to see if the limt can be increased"

This was unused

return &stream{
limiter: limiter.NewRateLimiter(newLocalStreamRateStrategy(limits), 10*time.Second),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the the Cortex rate limiter utility and realized it has a tenancy concept built into it. This has some nice benefits (automatically rechecking dynamically loaded overrides) as well as some costs (mutex). I suspect this is low cost enough that we don't have to worry about it (we'll have a map with one entry, gated by a RWMutex). However, if we end up seeing profile issues here in the future, we may have to roll our own or contribute upstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easy enough to roll our own in the near future, just makes use of an actual rate limiter library under the hood. I can file an issue for this.

@@ -113,8 +117,9 @@ type entryWithError struct {
e error
}

func newStream(cfg *Config, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits *validation.Overrides, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func newStream(cfg *Config, limits *validation.Overrides, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {

nit: Let's replace this with the strategy interface so we don't depend on the overrides itself here.

@@ -450,7 +450,7 @@ func Test_ChunkFilter(t *testing.T) {
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(
&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
&ingesterConfig, &validation.Overrides{}, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance type already has access to the overrides via
https://github.com/grafana/loki/blob/main/pkg/ingester/instance.go#L83

Let's remove this from the newInstance function to reduce arguments and potential confusion from specifying the limits twice.

@@ -63,6 +63,7 @@ var (

type instance struct {
cfg *Config
limits *validation.Overrides
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
@cstyan cstyan marked this pull request as ready for review August 19, 2021 22:46
@cstyan cstyan requested a review from a team as a code owner August 19, 2021 22:46
@cstyan cstyan changed the title WIP: Add a byte stream rate limit. [ingester/stream]: Add a byte stream rate limit. Aug 19, 2021
Signed-off-by: Callum Styan <callumstyan@gmail.com>
@owen-d owen-d merged commit 757eeac into grafana:main Aug 20, 2021
@@ -49,7 +50,8 @@ var (
)

var (
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrStreamRateLimit = errors.New("stream rate limit exceeded")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this before this was merged, but could you open a separate PR to move this error message to the validate.go file and also update it to include an action for the user to take (similar to other errors in that file).

We should always try to provide an actionable error message for anything returned by the API.

@owen-d
Copy link
Member

owen-d commented Aug 30, 2021

ref #1544

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants