-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ingester/stream]: Add a byte stream rate limit. #4191
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outline for this PR is looking good; I've left the first round of feedback.
pkg/ingester/ingester.go
Outdated
@@ -140,6 +140,7 @@ type Ingester struct { | |||
cfg Config | |||
clientConfig client.Config | |||
tenantConfigs *runtime.TenantConfigs | |||
limits *validation.Overrides |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overrides are already embedded in the limiter
field, which I think we should use for consistency.
pkg/ingester/limiter.go
Outdated
} | ||
|
||
func (s *localStrategy) Limit(userID string) float64 { | ||
return float64(s.limits.MaxLocalStreamRateMB(userID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use bytes, not MB
pkg/ingester/stream.go
Outdated
// Check if this this should be rate limited. | ||
now := time.Now() | ||
if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) { | ||
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(len(entries) - i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use accumulator counters for the discarded samples, similar to how out_of_order
s are handled on
line 206-212. This is the hot path and it's nice to call this once per batch rather than once per sample.
pkg/ingester/stream.go
Outdated
rateLimitesBytes += len(entries[j].Line) | ||
} | ||
validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitesBytes)) | ||
return 0, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamRateLimitErrorMsg, int(s.limiter.Limit(now, s.tenant)), len(entries)-i, rateLimitesBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't return early here for a few reasons:
- We'd never run the tailer code.
- We need to ultimately return the correct
bytesAdded
for partial batches. - We want the metrics to be correct for the whole batch, not just one sample.
Instead, try to incorporate these new failures into failedEntriesWithError
.
pkg/validation/limits.go
Outdated
_ = l.MaxLocalStreamRateBytes.Set("10000") | ||
f.Var(&l.MaxLocalStreamRateBytes, "ingester.max-stream-rate-bytes", "Maximum bytes per second rate per active stream.") | ||
_ = l.MaxLocalStreamBurstRateBytes.Set("20000") | ||
f.Var(&l.MaxLocalStreamBurstRateBytes, "ingester.max-stream-rate-bytes", "Maximum burst bytes per second rate per active stream.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please mimic how we set similar configs:
https://github.com/grafana/loki/blob/main/pkg/ingester/wal.go#L24-L50
Notably:
- Use a readable
const default ${VARIABLE}
with appropriate comment if applicable - Default to a powers of 2 form.
I'd like us to default to the following
const defaultPerStreamRateLimit = 1<<20 // 1MB
const defaultPerStreamBurstLimit = 2 * defaultPerStreamRateLimit
pkg/validation/limits.go
Outdated
func (o *Overrides) MaxLocalStreamRateMB(userID string) int { | ||
return o.getOverridesForUser(userID).MaxLocalStreamRateBytes.Val() | ||
} | ||
|
||
func (o *Overrides) MaxLocalStreamBurstRateMB(userID string) int { | ||
return int(o.getOverridesForUser(userID).MaxLocalStreamBurstRateBytes) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these calculated in different ways (int(x)
vs x.Val()
)? Let's consistently use .Val()
.
Signed-off-by: Callum Styan <callumstyan@gmail.com>
config value. Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
pkg/validation/validate.go
Outdated
//StreamRateLimit is a reason for discarding lines when the streams own rate limit is hit | ||
// rather than the overall ingestion rate limit. | ||
StreamRateLimit = "per_stream_rate_limt" | ||
StreamRateLimitErrorMsg = "Per stream rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, consider splitting a stream via additional labels or contact your Loki administrator to see if the limt can be increased" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamRateLimitErrorMsg = "Per stream rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, consider splitting a stream via additional labels or contact your Loki administrator to see if the limt can be increased" |
This was unused
pkg/ingester/stream.go
Outdated
return &stream{ | ||
limiter: limiter.NewRateLimiter(newLocalStreamRateStrategy(limits), 10*time.Second), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the the Cortex rate limiter utility and realized it has a tenancy concept built into it. This has some nice benefits (automatically rechecking dynamically loaded overrides
) as well as some costs (mutex). I suspect this is low cost enough that we don't have to worry about it (we'll have a map with one entry, gated by a RWMutex
). However, if we end up seeing profile issues here in the future, we may have to roll our own or contribute upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Easy enough to roll our own in the near future, just makes use of an actual rate limiter library under the hood. I can file an issue for this.
pkg/ingester/stream.go
Outdated
@@ -113,8 +117,9 @@ type entryWithError struct { | |||
e error | |||
} | |||
|
|||
func newStream(cfg *Config, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { | |||
func newStream(cfg *Config, limits *validation.Overrides, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func newStream(cfg *Config, limits *validation.Overrides, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { | |
func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { |
nit: Let's replace this with the strategy interface so we don't depend on the overrides itself here.
pkg/ingester/instance_test.go
Outdated
@@ -450,7 +450,7 @@ func Test_ChunkFilter(t *testing.T) { | |||
overrides, err := validation.NewOverrides(defaultLimits, nil) | |||
require.NoError(t, err) | |||
instance := newInstance( | |||
&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) | |||
&ingesterConfig, &validation.Overrides{}, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The instance
type already has access to the overrides via
https://github.com/grafana/loki/blob/main/pkg/ingester/instance.go#L83
Let's remove this from the newInstance
function to reduce arguments and potential confusion from specifying the limits twice.
pkg/ingester/instance.go
Outdated
@@ -63,6 +63,7 @@ var ( | |||
|
|||
type instance struct { | |||
cfg *Config | |||
limits *validation.Overrides |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should already be covered here:
https://github.com/grafana/loki/blob/main/pkg/ingester/instance.go#L83
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
@@ -49,7 +50,8 @@ var ( | |||
) | |||
|
|||
var ( | |||
ErrEntriesExist = errors.New("duplicate push - entries already exist") | |||
ErrEntriesExist = errors.New("duplicate push - entries already exist") | |||
ErrStreamRateLimit = errors.New("stream rate limit exceeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this before this was merged, but could you open a separate PR to move this error message to the validate.go file and also update it to include an action for the user to take (similar to other errors in that file).
We should always try to provide an actionable error message for anything returned by the API.
ref #1544 |
Adds a limiter to the stream struct.
Signed-off-by: Callum Styan callumstyan@gmail.com