Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ingester/stream]: Add a byte stream rate limit. #4191

Merged
merged 9 commits into from
Aug 20, 2021
9 changes: 5 additions & 4 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,12 @@ var (

func Test_SeriesIterator(t *testing.T) {
var instances []*instance

limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
if !ok {

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

Expand Down
46 changes: 38 additions & 8 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ func defaultConfig() *Config {
var NilMetrics = newIngesterMetrics(nil)

func TestLabelsCollisions(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 10000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -64,7 +69,12 @@ func TestLabelsCollisions(t *testing.T) {
}

func TestConcurrentPushes(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 100000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -115,7 +125,12 @@ func TestConcurrentPushes(t *testing.T) {
}

func TestSyncPeriod(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -157,7 +172,12 @@ func TestSyncPeriod(t *testing.T) {
}

func Test_SeriesQuery(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -178,7 +198,7 @@ func Test_SeriesQuery(t *testing.T) {
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, "fake", 0, nil, true, NilMetrics).NewChunk()
chunk := newStream(cfg, newLocalStreamRateStrategy(limiter), "fake", 0, nil, true, NilMetrics).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -272,7 +292,12 @@ func makeRandomLabels() labels.Labels {
}

func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -312,7 +337,12 @@ func Benchmark_PushInstance(b *testing.B) {
}

func Benchmark_instance_addNewTailer(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 100000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -334,7 +364,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, "fake", 0, lbs, true, NilMetrics))
inst.addTailersToNewStream(newStream(nil, newLocalStreamRateStrategy(limiter), "fake", 0, lbs, true, NilMetrics))
}
})
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"math"
"sync"

cortex_limiter "github.com/cortexproject/cortex/pkg/util/limiter"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -125,3 +128,28 @@ func (l *Limiter) minNonZero(first, second int) int {

return first
}

type localStrategy struct {
limiter *Limiter
}

func newLocalStreamRateStrategy(l *Limiter) cortex_limiter.RateLimiterStrategy {
return &localStrategy{
limiter: l,
}
}

func (s *localStrategy) Limit(userID string) float64 {
if s.limiter.disabled {
return float64(rate.Inf)
}
return float64(s.limiter.limits.MaxLocalStreamRateBytes(userID))
}

func (s *localStrategy) Burst(userID string) int {
if s.limiter.disabled {
// Burst is ignored when limit = rate.Inf
return 0
}
return s.limiter.limits.MaxLocalStreamBurstRateBytes(userID)
}
64 changes: 44 additions & 20 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -49,7 +50,8 @@ var (
)

var (
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrStreamRateLimit = errors.New("stream rate limit exceeded")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this before this was merged, but could you open a separate PR to move this error message to the validate.go file and also update it to include an action for the user to take (similar to other errors in that file).

We should always try to provide an actionable error message for anything returned by the API.

)

func init() {
Expand All @@ -64,8 +66,9 @@ type line struct {
}

type stream struct {
cfg *Config
tenant string
limiter *limiter.RateLimiter
cfg *Config
tenant string
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
Expand Down Expand Up @@ -113,8 +116,9 @@ type entryWithError struct {
e error
}

func newStream(cfg *Config, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
return &stream{
limiter: limiter.NewRateLimiter(limits, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
Expand Down Expand Up @@ -196,11 +200,16 @@ func (s *stream) Push(
failedEntriesWithError := []entryWithError{}

var outOfOrderSamples, outOfOrderBytes int
var rateLimitedSamples, rateLimitedBytes int
defer func() {
if outOfOrderSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes))
}
if rateLimitedSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes))
}
}()

// Don't fail on the first append error - if samples are sent out of order,
Expand All @@ -222,6 +231,14 @@ func (s *stream) Push(
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
chunk = s.cutChunk(ctx)
}
// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit})
rateLimitedSamples++
rateLimitedBytes += len(entries[i].Line)
continue
}

// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
Expand Down Expand Up @@ -292,27 +309,34 @@ func (s *stream) Push(

if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && lastEntryWithErr.e != ErrStreamRateLimit {
return bytesAdded, lastEntryWithErr.e
}
var statusCode int
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
// return bad http status request response with all failed entries
buf := bytes.Buffer{}
streamName := s.labelsString
statusCode = http.StatusBadRequest
}
if lastEntryWithErr.e == ErrStreamRateLimit {
statusCode = http.StatusTooManyRequests
}
// Return a http status 4xx request response with all failed entries.
buf := bytes.Buffer{}
streamName := s.labelsString

limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
limitedFailedEntries = limitedFailedEntries[:maxIgnore]
}
limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
limitedFailedEntries = limitedFailedEntries[:maxIgnore]
}

for _, entryWithError := range limitedFailedEntries {
fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
}
for _, entryWithError := range limitedFailedEntries {
fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
}

fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))
fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))

return bytesAdded, httpgrpc.Errorf(http.StatusBadRequest, buf.String())
}
return bytesAdded, lastEntryWithErr.e
return bytesAdded, httpgrpc.Errorf(statusCode, buf.String())
}

if len(s.chunks) != prevNumChunks {
Expand Down
Loading