Skip to content

Commit

Permalink
[ingester/stream]: Add a byte stream rate limit. (#4191)
Browse files Browse the repository at this point in the history
* Add a byte stream rate limit.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Track discarded lines due to stream rate limit + use ByteSize for the
config value.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address review comments

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Another round of review fixes.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix goimports issue.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix duplicate flag registration + stream rate limit test.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix stream rate/burst limit flags yaml naming.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix tests now that we have stream rate limit.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix some comments

Signed-off-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
cstyan authored Aug 20, 2021
1 parent 3ee8ed3 commit 757eeac
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 40 deletions.
9 changes: 5 additions & 4 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,12 @@ var (

func Test_SeriesIterator(t *testing.T) {
var instances []*instance

limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
if !ok {

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

Expand Down
46 changes: 38 additions & 8 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ func defaultConfig() *Config {
var NilMetrics = newIngesterMetrics(nil)

func TestLabelsCollisions(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 10000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -64,7 +69,12 @@ func TestLabelsCollisions(t *testing.T) {
}

func TestConcurrentPushes(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 100000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -115,7 +125,12 @@ func TestConcurrentPushes(t *testing.T) {
}

func TestSyncPeriod(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -157,7 +172,12 @@ func TestSyncPeriod(t *testing.T) {
}

func Test_SeriesQuery(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -178,7 +198,7 @@ func Test_SeriesQuery(t *testing.T) {
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, "fake", 0, nil, true, NilMetrics).NewChunk()
chunk := newStream(cfg, newLocalStreamRateStrategy(limiter), "fake", 0, nil, true, NilMetrics).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -272,7 +292,12 @@ func makeRandomLabels() labels.Labels {
}

func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -312,7 +337,12 @@ func Benchmark_PushInstance(b *testing.B) {
}

func Benchmark_instance_addNewTailer(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil)
l := validation.Limits{
MaxLocalStreamsPerUser: 100000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -334,7 +364,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, "fake", 0, lbs, true, NilMetrics))
inst.addTailersToNewStream(newStream(nil, newLocalStreamRateStrategy(limiter), "fake", 0, lbs, true, NilMetrics))
}
})
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"math"
"sync"

cortex_limiter "github.com/cortexproject/cortex/pkg/util/limiter"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -125,3 +128,28 @@ func (l *Limiter) minNonZero(first, second int) int {

return first
}

type localStrategy struct {
limiter *Limiter
}

func newLocalStreamRateStrategy(l *Limiter) cortex_limiter.RateLimiterStrategy {
return &localStrategy{
limiter: l,
}
}

func (s *localStrategy) Limit(userID string) float64 {
if s.limiter.disabled {
return float64(rate.Inf)
}
return float64(s.limiter.limits.MaxLocalStreamRateBytes(userID))
}

func (s *localStrategy) Burst(userID string) int {
if s.limiter.disabled {
// Burst is ignored when limit = rate.Inf
return 0
}
return s.limiter.limits.MaxLocalStreamBurstRateBytes(userID)
}
64 changes: 44 additions & 20 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -49,7 +50,8 @@ var (
)

var (
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrStreamRateLimit = errors.New("stream rate limit exceeded")
)

func init() {
Expand All @@ -64,8 +66,9 @@ type line struct {
}

type stream struct {
cfg *Config
tenant string
limiter *limiter.RateLimiter
cfg *Config
tenant string
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
Expand Down Expand Up @@ -113,8 +116,9 @@ type entryWithError struct {
e error
}

func newStream(cfg *Config, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
return &stream{
limiter: limiter.NewRateLimiter(limits, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
Expand Down Expand Up @@ -196,11 +200,16 @@ func (s *stream) Push(
failedEntriesWithError := []entryWithError{}

var outOfOrderSamples, outOfOrderBytes int
var rateLimitedSamples, rateLimitedBytes int
defer func() {
if outOfOrderSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes))
}
if rateLimitedSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes))
}
}()

// Don't fail on the first append error - if samples are sent out of order,
Expand All @@ -222,6 +231,14 @@ func (s *stream) Push(
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
chunk = s.cutChunk(ctx)
}
// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit})
rateLimitedSamples++
rateLimitedBytes += len(entries[i].Line)
continue
}

// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
Expand Down Expand Up @@ -292,27 +309,34 @@ func (s *stream) Push(

if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && lastEntryWithErr.e != ErrStreamRateLimit {
return bytesAdded, lastEntryWithErr.e
}
var statusCode int
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
// return bad http status request response with all failed entries
buf := bytes.Buffer{}
streamName := s.labelsString
statusCode = http.StatusBadRequest
}
if lastEntryWithErr.e == ErrStreamRateLimit {
statusCode = http.StatusTooManyRequests
}
// Return a http status 4xx request response with all failed entries.
buf := bytes.Buffer{}
streamName := s.labelsString

limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
limitedFailedEntries = limitedFailedEntries[:maxIgnore]
}
limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
limitedFailedEntries = limitedFailedEntries[:maxIgnore]
}

for _, entryWithError := range limitedFailedEntries {
fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
}
for _, entryWithError := range limitedFailedEntries {
fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
}

fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))
fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))

return bytesAdded, httpgrpc.Errorf(http.StatusBadRequest, buf.String())
}
return bytesAdded, lastEntryWithErr.e
return bytesAdded, httpgrpc.Errorf(statusCode, buf.String())
}

if len(s.chunks) != prevNumChunks {
Expand Down
Loading

0 comments on commit 757eeac

Please sign in to comment.