diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 0626e083ebbd..ac13f2face87 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -443,11 +443,12 @@ var ( func Test_SeriesIterator(t *testing.T) { var instances []*instance - limits, err := validation.NewOverrides(validation.Limits{ - MaxLocalStreamsPerUser: 1000, - IngestionRateMB: 1e4, - IngestionBurstSizeMB: 1e4, + MaxLocalStreamsPerUser: 1000, + IngestionRateMB: 1e4, + IngestionBurstSizeMB: 1e4, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, }, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 35e2f7746e15..68fba68eb964 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo if !ok { sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp) - stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) + stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) i.streamsByFP[fp] = stream i.streams[stream.labelsString] = stream i.streamsCreatedTotal.Inc() @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp) - stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) + stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) i.streams[pushReqStream.Labels] = stream i.streamsByFP[fp] = stream diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index e9591fd83280..ba044194124c 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -37,7 +37,12 @@ func defaultConfig() *Config { var NilMetrics = newIngesterMetrics(nil) func TestLabelsCollisions(t *testing.T) { - limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) + l := validation.Limits{ + MaxLocalStreamsPerUser: 10000, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, + } + limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) @@ -64,7 +69,12 @@ func TestLabelsCollisions(t *testing.T) { } func TestConcurrentPushes(t *testing.T) { - limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) + l := validation.Limits{ + MaxLocalStreamsPerUser: 100000, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, + } + limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) @@ -115,7 +125,12 @@ func TestConcurrentPushes(t *testing.T) { } func TestSyncPeriod(t *testing.T) { - limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) + l := validation.Limits{ + MaxLocalStreamsPerUser: 1000, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, + } + limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) @@ -157,7 +172,12 @@ func TestSyncPeriod(t *testing.T) { } func Test_SeriesQuery(t *testing.T) { - limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) + l := validation.Limits{ + MaxLocalStreamsPerUser: 1000, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, + } + limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) @@ -178,7 +198,7 @@ func Test_SeriesQuery(t *testing.T) { for _, testStream := range testStreams { stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord()) require.NoError(t, err) - chunk := newStream(cfg, "fake", 0, nil, true, NilMetrics).NewChunk() + chunk := newStream(cfg, newLocalStreamRateStrategy(limiter), "fake", 0, nil, true, NilMetrics).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -272,7 +292,12 @@ func makeRandomLabels() labels.Labels { } func Benchmark_PushInstance(b *testing.B) { - limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) + l := validation.Limits{ + MaxLocalStreamsPerUser: 1000, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, + } + limits, err := validation.NewOverrides(l, nil) require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) @@ -312,7 +337,12 @@ func Benchmark_PushInstance(b *testing.B) { } func Benchmark_instance_addNewTailer(b *testing.B) { - limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil) + l := validation.Limits{ + MaxLocalStreamsPerUser: 100000, + MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes, + MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes, + } + limits, err := validation.NewOverrides(l, nil) require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) @@ -334,7 +364,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { lbs := makeRandomLabels() b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(nil, "fake", 0, lbs, true, NilMetrics)) + inst.addTailersToNewStream(newStream(nil, newLocalStreamRateStrategy(limiter), "fake", 0, lbs, true, NilMetrics)) } }) } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 0f222d667a29..436025ab9b09 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -5,6 +5,9 @@ import ( "math" "sync" + cortex_limiter "github.com/cortexproject/cortex/pkg/util/limiter" + "golang.org/x/time/rate" + "github.com/grafana/loki/pkg/validation" ) @@ -125,3 +128,28 @@ func (l *Limiter) minNonZero(first, second int) int { return first } + +type localStrategy struct { + limiter *Limiter +} + +func newLocalStreamRateStrategy(l *Limiter) cortex_limiter.RateLimiterStrategy { + return &localStrategy{ + limiter: l, + } +} + +func (s *localStrategy) Limit(userID string) float64 { + if s.limiter.disabled { + return float64(rate.Inf) + } + return float64(s.limiter.limits.MaxLocalStreamRateBytes(userID)) +} + +func (s *localStrategy) Burst(userID string) int { + if s.limiter.disabled { + // Burst is ignored when limit = rate.Inf + return 0 + } + return s.limiter.limits.MaxLocalStreamBurstRateBytes(userID) +} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 6411d50a660d..44ca8f45192e 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -49,7 +50,8 @@ var ( ) var ( - ErrEntriesExist = errors.New("duplicate push - entries already exist") + ErrEntriesExist = errors.New("duplicate push - entries already exist") + ErrStreamRateLimit = errors.New("stream rate limit exceeded") ) func init() { @@ -64,8 +66,9 @@ type line struct { } type stream struct { - cfg *Config - tenant string + limiter *limiter.RateLimiter + cfg *Config + tenant string // Newest chunk at chunks[n-1]. // Not thread-safe; assume accesses to this are locked by caller. chunks []chunkDesc @@ -113,8 +116,9 @@ type entryWithError struct { e error } -func newStream(cfg *Config, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { +func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { return &stream{ + limiter: limiter.NewRateLimiter(limits, 10*time.Second), cfg: cfg, fp: fp, labels: labels, @@ -196,11 +200,16 @@ func (s *stream) Push( failedEntriesWithError := []entryWithError{} var outOfOrderSamples, outOfOrderBytes int + var rateLimitedSamples, rateLimitedBytes int defer func() { if outOfOrderSamples > 0 { validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples)) validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes)) } + if rateLimitedSamples > 0 { + validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) + validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes)) + } }() // Don't fail on the first append error - if samples are sent out of order, @@ -222,6 +231,14 @@ func (s *stream) Push( if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { chunk = s.cutChunk(ctx) } + // Check if this this should be rate limited. + now := time.Now() + if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) { + failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit}) + rateLimitedSamples++ + rateLimitedBytes += len(entries[i].Line) + continue + } // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) { @@ -292,27 +309,34 @@ func (s *stream) Push( if len(failedEntriesWithError) > 0 { lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1] + if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && lastEntryWithErr.e != ErrStreamRateLimit { + return bytesAdded, lastEntryWithErr.e + } + var statusCode int if lastEntryWithErr.e == chunkenc.ErrOutOfOrder { - // return bad http status request response with all failed entries - buf := bytes.Buffer{} - streamName := s.labelsString + statusCode = http.StatusBadRequest + } + if lastEntryWithErr.e == ErrStreamRateLimit { + statusCode = http.StatusTooManyRequests + } + // Return a http status 4xx request response with all failed entries. + buf := bytes.Buffer{} + streamName := s.labelsString - limitedFailedEntries := failedEntriesWithError - if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore { - limitedFailedEntries = limitedFailedEntries[:maxIgnore] - } + limitedFailedEntries := failedEntriesWithError + if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore { + limitedFailedEntries = limitedFailedEntries[:maxIgnore] + } - for _, entryWithError := range limitedFailedEntries { - fmt.Fprintf(&buf, - "entry with timestamp %s ignored, reason: '%s' for stream: %s,\n", - entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName) - } + for _, entryWithError := range limitedFailedEntries { + fmt.Fprintf(&buf, + "entry with timestamp %s ignored, reason: '%s' for stream: %s,\n", + entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName) + } - fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries)) + fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries)) - return bytesAdded, httpgrpc.Errorf(http.StatusBadRequest, buf.String()) - } - return bytesAdded, lastEntryWithErr.e + return bytesAdded, httpgrpc.Errorf(statusCode, buf.String()) } if len(s.chunks) != prevNumChunks { diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 439384a716a6..0d86b9b884f6 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/validation" ) var ( @@ -42,12 +43,17 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { {"unlimited", 0, numLogs}, } + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { cfg := defaultConfig() cfg.MaxReturnedErrors = tc.limit s := newStream( cfg, + newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), labels.Labels{ @@ -86,8 +92,13 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { } func TestPushDeduplication(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + s := newStream( defaultConfig(), + newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), labels.Labels{ @@ -110,8 +121,13 @@ func TestPushDeduplication(t *testing.T) { } func TestPushRejectOldCounter(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + s := newStream( defaultConfig(), + newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), labels.Labels{ @@ -122,7 +138,7 @@ func TestPushRejectOldCounter(t *testing.T) { ) // counter should be 2 now since the first line will be deduped - _, err := s.Push(context.Background(), []logproto.Entry{ + _, err = s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, @@ -199,8 +215,13 @@ func TestStreamIterator(t *testing.T) { func TestUnorderedPush(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.MaxChunkAge = 10 * time.Second + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + s := newStream( &cfg, + newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), labels.Labels{ @@ -287,6 +308,36 @@ func TestUnorderedPush(t *testing.T) { require.Equal(t, false, sItr.Next()) } +func TestPushRateLimit(t *testing.T) { + l := validation.Limits{ + MaxLocalStreamRateBytes: 10, + MaxLocalStreamBurstRateBytes: 10, + } + limits, err := validation.NewOverrides(l, nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + + s := newStream( + defaultConfig(), + newLocalStreamRateStrategy(limiter), + "fake", + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + true, + NilMetrics, + ) + + // Counter should be 2 now since the first line will be deduped. + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "aaaaaaaaaa"}, + {Timestamp: time.Unix(1, 0), Line: "aaaaaaaaab"}, + }, recordPool.GetRecord(), 0) + require.Contains(t, err.Error(), ErrStreamRateLimit.Error()) + require.Contains(t, err.Error(), "total ignored: 1 out of 2") +} + func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) { var i int for got.Next() { @@ -304,7 +355,12 @@ func Benchmark_PushStream(b *testing.B) { labels.Label{Name: "job", Value: "loki-dev/ingester"}, labels.Label{Name: "container", Value: "ingester"}, } - s := newStream(&Config{}, "fake", model.Fingerprint(0), ls, true, NilMetrics) + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(b, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + + s := newStream(&Config{}, newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), ls, true, NilMetrics) t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}) require.NoError(b, err) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 2ae5e7a77fc6..0fdf09dd224f 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -3,6 +3,7 @@ package validation import ( "flag" "fmt" + "strconv" "time" "github.com/prometheus/common/model" @@ -20,6 +21,9 @@ const ( GlobalIngestionRateStrategy = "global" bytesInMB = 1048576 + + defaultPerStreamRateLimit = 1 << 20 // 1MB + defaultPerStreamBurstLimit = 2 * defaultPerStreamRateLimit ) // Limits describe all the limits for users; can be used to describe global default @@ -42,9 +46,11 @@ type Limits struct { MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` // Ingester enforced limits. - MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` - MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user" json:"max_global_streams_per_user"` - UnorderedWrites bool `yaml:"unordered_writes" json:"unordered_writes"` + MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` + MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user" json:"max_global_streams_per_user"` + UnorderedWrites bool `yaml:"unordered_writes" json:"unordered_writes"` + MaxLocalStreamRateBytes flagext.ByteSize `yaml:"max_stream_rate_bytes" json:"max_stream_rate_bytes"` + MaxLocalStreamBurstRateBytes flagext.ByteSize `yaml:"max_stream_burst_rate_bytes" json:"max_stream_burst_rate_bytes"` // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` @@ -107,6 +113,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.") f.BoolVar(&l.UnorderedWrites, "ingester.unordered-writes", false, "(Experimental) Allow out of order writes.") + _ = l.MaxLocalStreamRateBytes.Set(strconv.Itoa(defaultPerStreamRateLimit)) + f.Var(&l.MaxLocalStreamRateBytes, "ingester.max-stream-rate-bytes", "Maximum bytes per second rate per active stream.") + _ = l.MaxLocalStreamBurstRateBytes.Set(strconv.Itoa(defaultPerStreamBurstLimit)) + f.Var(&l.MaxLocalStreamBurstRateBytes, "ingester.max-stream-burst-bytes", "Maximum burst bytes per second rate per active stream.") f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") @@ -406,6 +416,14 @@ func (o *Overrides) UnorderedWrites(userID string) bool { return o.getOverridesForUser(userID).UnorderedWrites } +func (o *Overrides) MaxLocalStreamRateBytes(userID string) int { + return o.getOverridesForUser(userID).MaxLocalStreamRateBytes.Val() +} + +func (o *Overrides) MaxLocalStreamBurstRateBytes(userID string) int { + return o.getOverridesForUser(userID).MaxLocalStreamBurstRateBytes.Val() +} + func (o *Overrides) ForEachTenantLimit(callback ForEachTenantLimitCallback) { o.tenantLimits.ForEachTenantLimit(callback) } diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index c6236db35913..c9d9b4a414a5 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -23,7 +23,10 @@ const ( // because the limit of active streams has been reached. StreamLimit = "stream_limit" StreamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased" - OutOfOrder = "out_of_order" + // StreamRateLimit is a reason for discarding lines when the streams own rate limit is hit + // rather than the overall ingestion rate limit. + StreamRateLimit = "per_stream_rate_limt" + OutOfOrder = "out_of_order" // GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age` GreaterThanMaxSampleAge = "greater_than_max_sample_age" GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"