From 477a0e71dfc8b42770f6b36d2e684a232652051b Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Fri, 29 Apr 2022 12:55:05 +0100 Subject: [PATCH] Add filter parameter to rebound so lines can be deleted by the compactor (#5879) * Add filter parameter to rebound Signed-off-by: Michel Hollands * Fix linting issues Signed-off-by: Michel Hollands * Add filter function to delete request Signed-off-by: Michel Hollands * Fix linting issues Signed-off-by: Michel Hollands * Enable api for filter and delete mode Signed-off-by: Michel Hollands * Add settings for retention Signed-off-by: Michel Hollands * Use labels to check and add test Signed-off-by: Michel Hollands * Simplify filter function Signed-off-by: Michel Hollands * Also enable filter mode Signed-off-by: Michel Hollands * Remove test settings in config file for docker Signed-off-by: Michel Hollands * Add extra (unused) param for ProcessString in filter Signed-off-by: Michel Hollands * Empty commit to trigger CI again Signed-off-by: Michel Hollands * Update changelog Signed-off-by: Michel Hollands * Fix flapping test Signed-off-by: Michel Hollands * Remove commented out unit tests and add some more Signed-off-by: Michel Hollands * Add extra test case for delete request without line filter Signed-off-by: Michel Hollands * Use chunk bounds Signed-off-by: Michel Hollands * check if the log selector has a filter if the whole chunk is selected Signed-off-by: Michel Hollands * fix lint issue: use correct go-kit import Signed-off-by: Michel Hollands --- CHANGELOG.md | 1 + pkg/chunkenc/dumb_chunk.go | 3 +- pkg/chunkenc/facade.go | 5 +- pkg/chunkenc/interface.go | 3 +- pkg/chunkenc/memchunk.go | 8 +- pkg/chunkenc/memchunk_test.go | 96 ++++++++++++- pkg/loki/modules.go | 15 +- pkg/storage/chunk/bigchunk.go | 4 +- pkg/storage/chunk/interface.go | 4 +- .../stores/shipper/compactor/compactor.go | 14 +- .../compactor/deletion/delete_request.go | 95 +++++++++++-- .../compactor/deletion/delete_request_test.go | 133 +++++++++++++++--- .../deletion/delete_requests_manager.go | 28 ++-- .../deletion/delete_requests_manager_test.go | 30 ++-- .../shipper/compactor/deletion/validation.go | 15 +- .../compactor/deletion/validation_test.go | 30 ++-- .../shipper/compactor/retention/expiration.go | 15 +- .../compactor/retention/expiration_test.go | 4 +- .../shipper/compactor/retention/retention.go | 30 ++-- .../compactor/retention/retention_test.go | 127 +++++++++++------ pkg/util/filter/filter_function.go | 3 + 21 files changed, 508 insertions(+), 155 deletions(-) create mode 100644 pkg/util/filter/filter_function.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d1565f4e8d5..b0bb8f1282e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main * [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters. +* [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode. * [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9. * [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config * [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist. diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 3a0e45837bfa..e0f4cf670ed4 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/filter" ) const ( @@ -122,7 +123,7 @@ func (c *dumbChunk) Close() error { return nil } -func (c *dumbChunk) Rebound(start, end time.Time) (Chunk, error) { +func (c *dumbChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) { return nil, nil } diff --git a/pkg/chunkenc/facade.go b/pkg/chunkenc/facade.go index 36c6331aafe0..1db71c189cd6 100644 --- a/pkg/chunkenc/facade.go +++ b/pkg/chunkenc/facade.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/filter" ) // GzipLogChunk is a cortex encoding type for our chunks. @@ -86,8 +87,8 @@ func (f Facade) LokiChunk() Chunk { return f.c } -func (f Facade) Rebound(start, end model.Time) (chunk.Data, error) { - newChunk, err := f.c.Rebound(start.Time(), end.Time()) +func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error) { + newChunk, err := f.c.Rebound(start.Time(), end.Time(), filter) if err != nil { return nil, err } diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 74fc87c8ed5c..7ac494c79671 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/filter" ) // Errors returned by the chunk interface. @@ -127,7 +128,7 @@ type Chunk interface { CompressedSize() int Close() error Encoding() Encoding - Rebound(start, end time.Time) (Chunk, error) + Rebound(start, end time.Time, filter filter.Func) (Chunk, error) } // Block is a chunk block. diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index e29e1f25df72..ffb4f179a0de 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/filter" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -713,7 +714,7 @@ func (c *MemChunk) reorder() error { // Otherwise, we need to rebuild the blocks from, to := c.Bounds() - newC, err := c.Rebound(from, to) + newC, err := c.Rebound(from, to, nil) if err != nil { return err } @@ -910,7 +911,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { } // Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive) -func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) { +func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) { // add a millisecond to end time because the Chunk.Iterator considers end time to be non-inclusive. itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) if err != nil { @@ -931,6 +932,9 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) { for itr.Next() { entry := itr.Entry() + if filter != nil && filter(entry.Line) { + continue + } if err := newChunk.Append(&entry); err != nil { return nil, err } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index d94259cc2ea4..0b6d7e20d443 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -1188,7 +1188,7 @@ func TestMemChunk_Rebound(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo) + newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo, nil) if tc.err != nil { require.Equal(t, tc.err, err) return @@ -1231,3 +1231,97 @@ func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { return chk } + +func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) { + chkFrom := time.Unix(1, 0) // headBlock.Append treats Unix time 0 as not set so we have to use a later time + chkFromPlus5 := chkFrom.Add(5 * time.Second) + chkThrough := chkFrom.Add(10 * time.Second) + chkThroughPlus1 := chkThrough.Add(1 * time.Second) + + filterFunc := func(in string) bool { + return strings.HasPrefix(in, "matching") + } + + for _, tc := range []struct { + name string + matchingSliceFrom, matchingSliceTo *time.Time + err error + nrMatching int + nrNotMatching int + }{ + { + name: "no matches", + nrMatching: 0, + nrNotMatching: 10, + }, + { + name: "some lines removed", + matchingSliceFrom: &chkFrom, + matchingSliceTo: &chkFromPlus5, + nrMatching: 5, + nrNotMatching: 5, + }, + { + name: "all lines match", + err: chunk.ErrSliceNoDataInRange, + matchingSliceFrom: &chkFrom, + matchingSliceTo: &chkThroughPlus1, + }, + } { + t.Run(tc.name, func(t *testing.T) { + originalChunk := buildFilterableTestMemChunk(t, chkFrom, chkThrough, tc.matchingSliceFrom, tc.matchingSliceTo) + newChunk, err := originalChunk.Rebound(chkFrom, chkThrough, filterFunc) + if tc.err != nil { + require.Equal(t, tc.err, err) + return + } + require.NoError(t, err) + + // iterate originalChunk from slice start to slice end + nanosecond. Adding a nanosecond here to be inclusive of sample at end time. + originalChunkItr, err := originalChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + originalChunkSamples := 0 + for originalChunkItr.Next() { + originalChunkSamples++ + } + require.Equal(t, tc.nrMatching+tc.nrNotMatching, originalChunkSamples) + + // iterate newChunk for whole chunk interval which should include all the samples in the chunk and hence align it with expected values. + newChunkItr, err := newChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + newChunkSamples := 0 + for newChunkItr.Next() { + newChunkSamples++ + } + require.Equal(t, tc.nrNotMatching, newChunkSamples) + }) + } +} + +func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time) *MemChunk { + chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0) + t.Logf("from : %v", from.String()) + t.Logf("through: %v", through.String()) + for from.Before(through) { + // If a line is between matchingFrom and matchingTo add the prefix "matching" + if matchingFrom != nil && matchingTo != nil && + (from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) { + t.Logf("%v matching line", from.String()) + err := chk.Append(&logproto.Entry{ + Line: fmt.Sprintf("matching %v", from.String()), + Timestamp: from, + }) + require.NoError(t, err) + } else { + t.Logf("%v non-match line", from.String()) + err := chk.Append(&logproto.Entry{ + Line: from.String(), + Timestamp: from, + }) + require.NoError(t, err) + } + from = from.Add(time.Second) + } + + return chk +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 389d97de954e..9d36d1e96a5b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -795,12 +795,15 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) - if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() != deletion.Disabled { - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) - - t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))) + if t.Cfg.CompactorConfig.RetentionEnabled { + switch t.compactor.DeleteMode() { + case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) + default: + break + } } return t.compactor, nil diff --git a/pkg/storage/chunk/bigchunk.go b/pkg/storage/chunk/bigchunk.go index 70b188faaad2..f131a23f7f80 100644 --- a/pkg/storage/chunk/bigchunk.go +++ b/pkg/storage/chunk/bigchunk.go @@ -8,6 +8,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/chunkenc" + + "github.com/grafana/loki/pkg/util/filter" ) const samplesPerChunk = 120 @@ -85,7 +87,7 @@ func (b *bigchunk) addNextChunk(start model.Time) error { return nil } -func (b *bigchunk) Rebound(start, end model.Time) (Data, error) { +func (b *bigchunk) Rebound(start, end model.Time, filter filter.Func) (Data, error) { return nil, errors.New("not implemented") } diff --git a/pkg/storage/chunk/interface.go b/pkg/storage/chunk/interface.go index be5f83211076..159a8a788ec5 100644 --- a/pkg/storage/chunk/interface.go +++ b/pkg/storage/chunk/interface.go @@ -23,6 +23,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" errs "github.com/weaveworks/common/errors" + + "github.com/grafana/loki/pkg/util/filter" ) const ( @@ -47,7 +49,7 @@ type Data interface { // Rebound returns a smaller chunk that includes all samples between start and end (inclusive). // We do not want to change existing Slice implementations because // it is built specifically for query optimization and is a noop for some of the encodings. - Rebound(start, end model.Time) (Data, error) + Rebound(start, end model.Time, filter filter.Func) (Data, error) // Size returns the approximate length of the chunk in bytes. Size() int Utilization() float64 diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index fdffe742838f..a6e250bae717 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -229,7 +229,8 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } - if c.deleteMode != deletion.Disabled { + switch c.deleteMode { + case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) @@ -237,9 +238,14 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r) - c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager( + c.deleteRequestsStore, + c.cfg.DeleteRequestCancelPeriod, + r, + c.deleteMode, + ) c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) - } else { + default: c.expirationChecker = newExpirationChecker( retention.NewExpirationChecker(limits), // This is a dummy deletion ExpirationChecker that never expires anything @@ -567,7 +573,7 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker} } -func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) { +func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []retention.IntervalFilter) { if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired { return expired, nonDeletedIntervals } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index 27aa24ddd256..d6e26d472ef5 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -1,10 +1,14 @@ package deletion import ( + "github.com/go-kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/util/filter" + util_log "github.com/grafana/loki/pkg/util/log" ) type DeleteRequest struct { @@ -15,21 +19,60 @@ type DeleteRequest struct { Status DeleteRequestStatus `json:"status"` CreatedAt model.Time `json:"created_at"` - UserID string `json:"-"` - matchers []*labels.Matcher `json:"-"` + UserID string `json:"-"` + matchers []*labels.Matcher `json:"-"` + logSelectorExpr syntax.LogSelectorExpr `json:"-"` } func (d *DeleteRequest) SetQuery(logQL string) error { d.Query = logQL - matchers, err := parseDeletionQuery(logQL) + logSelectorExpr, err := parseDeletionQuery(logQL) if err != nil { return err } - d.matchers = matchers + d.logSelectorExpr = logSelectorExpr + d.matchers = logSelectorExpr.Matchers() return nil } -func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Interval) { +// FilterFunction returns a filter function that returns true if the given line matches +func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error) { + if d.logSelectorExpr == nil { + err := d.SetQuery(d.Query) + if err != nil { + return nil, err + } + } + p, err := d.logSelectorExpr.Pipeline() + if err != nil { + return nil, err + } + + if !allMatch(d.matchers, labels) { + return func(s string) bool { + return false + }, nil + } + + f := p.ForStream(labels).ProcessString + return func(s string) bool { + result, _, skip := f(0, s) + return len(result) != 0 || skip + }, nil +} + +func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { + for _, m := range matchers { + if !m.Matches(labels.Get(m.Name)) { + return false + } + } + return true +} + +// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest. +// It also returns the intervals of the ChunkEntry that will remain before filtering. +func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention.IntervalFilter) { if d.UserID != unsafeGetString(entry.UserID) { return false, nil } @@ -48,23 +91,51 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Int return false, nil } + ff, err := d.FilterFunction(entry.Labels) + if err != nil { + // The query in the delete request is checked when added to the table. + // So this error should not occur. + level.Error(util_log.Logger).Log("msg", "unexpected error getting filter function", "err", err) + return false, nil + } + if d.StartTime <= entry.From && d.EndTime >= entry.Through { + // if the logSelectorExpr has a filter part return the chunk boundaries as intervals + if d.logSelectorExpr.HasFilter() { + return true, []retention.IntervalFilter{ + { + Interval: model.Interval{ + Start: entry.From, + End: entry.Through, + }, + Filter: ff, + }, + } + } + + // No filter in the logSelectorExpr so the whole chunk will be deleted return true, nil } - intervals := make([]model.Interval, 0, 2) + intervals := make([]retention.IntervalFilter, 0, 2) if d.StartTime > entry.From { - intervals = append(intervals, model.Interval{ - Start: entry.From, - End: d.StartTime - 1, + intervals = append(intervals, retention.IntervalFilter{ + Interval: model.Interval{ + Start: entry.From, + End: d.StartTime - 1, + }, + Filter: ff, }) } if d.EndTime < entry.Through { - intervals = append(intervals, model.Interval{ - Start: d.EndTime + 1, - End: entry.Through, + intervals = append(intervals, retention.IntervalFilter{ + Interval: model.Interval{ + Start: d.EndTime + 1, + End: entry.Through, + }, + Filter: ff, }) } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go index ec7232954d91..e609d1055540 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go @@ -17,6 +17,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { user1 := "user1" lbl := `{foo="bar", fizz="buzz"}` + lblWithFilter := `{foo="bar", fizz="buzz"} |= "filter"` chunkEntry := retention.ChunkEntry{ ChunkRef: retention.ChunkRef{ @@ -29,7 +30,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { type resp struct { isDeleted bool - nonDeletedIntervals []model.Interval + nonDeletedIntervals []retention.IntervalFilter } for _, tc := range []struct { @@ -50,6 +51,26 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { nonDeletedIntervals: nil, }, }, + { + name: "whole chunk deleted with filter present", + deleteRequest: DeleteRequest{ + UserID: user1, + StartTime: now.Add(-3 * time.Hour), + EndTime: now.Add(-time.Hour), + Query: lblWithFilter, + }, + expectedResp: resp{ + isDeleted: true, + nonDeletedIntervals: []retention.IntervalFilter{ + { + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-time.Hour), + }, + }, + }, + }, + }, { name: "chunk deleted from beginning", deleteRequest: DeleteRequest{ @@ -60,10 +81,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-2*time.Hour) + 1, - End: now.Add(-time.Hour), + Interval: model.Interval{ + Start: now.Add(-2*time.Hour) + 1, + End: now.Add(-time.Hour), + }, }, }, }, @@ -78,10 +101,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-3 * time.Hour), - End: now.Add(-2*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-2*time.Hour) - 1, + }, }, }, }, @@ -96,10 +121,32 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-3 * time.Hour), - End: now.Add(-2*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-2*time.Hour) - 1, + }, + }, + }, + }, + }, + { + name: "chunk deleted from end with filter", + deleteRequest: DeleteRequest{ + UserID: user1, + StartTime: now.Add(-2 * time.Hour), + EndTime: now, + Query: lblWithFilter, + }, + expectedResp: resp{ + isDeleted: true, + nonDeletedIntervals: []retention.IntervalFilter{ + { + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-2*time.Hour) - 1, + }, }, }, }, @@ -114,14 +161,18 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-3 * time.Hour), - End: now.Add(-(2*time.Hour + 30*time.Minute)) - 1, + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-(2*time.Hour + 30*time.Minute)) - 1, + }, }, { - Start: now.Add(-(time.Hour + 30*time.Minute)) + 1, - End: now.Add(-time.Hour), + Interval: model.Interval{ + Start: now.Add(-(time.Hour + 30*time.Minute)) + 1, + End: now.Add(-time.Hour), + }, }, }, }, @@ -167,7 +218,16 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query)) isDeleted, nonDeletedIntervals := tc.deleteRequest.IsDeleted(chunkEntry) require.Equal(t, tc.expectedResp.isDeleted, isDeleted) - require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals) + for idx := range tc.expectedResp.nonDeletedIntervals { + require.Equal(t, + tc.expectedResp.nonDeletedIntervals[idx].Interval.Start, + nonDeletedIntervals[idx].Interval.Start, + ) + require.Equal(t, + tc.expectedResp.nonDeletedIntervals[idx].Interval.End, + nonDeletedIntervals[idx].Interval.End, + ) + } }) } } @@ -180,3 +240,44 @@ func mustParseLabel(input string) labels.Labels { return lbls } + +func TestDeleteRequest_FilterFunction(t *testing.T) { + dr := DeleteRequest{ + Query: `{foo="bar"} |= "some"`, + } + + lblStr := `{foo="bar"}` + lbls := mustParseLabel(lblStr) + + f, err := dr.FilterFunction(lbls) + require.NoError(t, err) + + require.True(t, f(`some line`)) + require.False(t, f("")) + require.False(t, f("other line")) + + lblStr = `{foo2="buzz"}` + lbls = mustParseLabel(lblStr) + + f, err = dr.FilterFunction(lbls) + require.NoError(t, err) + + require.False(t, f("")) + require.False(t, f("other line")) + require.False(t, f("some line")) + + dr = DeleteRequest{ + Query: `{namespace="default"}`, + } + + lblStr = `{namespace="default"}` + lbls = mustParseLabel(lblStr) + + f, err = dr.FilterFunction(lbls) + require.NoError(t, err) + + require.True(t, f(`some line`)) + require.True(t, f("")) + require.True(t, f("other line")) + +} diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go index 76a73124b2db..057a020fdf73 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go @@ -24,21 +24,23 @@ type DeleteRequestsManager struct { deleteRequestCancelPeriod time.Duration deleteRequestsToProcess []DeleteRequest - chunkIntervalsToRetain []model.Interval + chunkIntervalsToRetain []retention.IntervalFilter // WARN: If by any chance we change deleteRequestsToProcessMtx to sync.RWMutex to be able to check multiple chunks at a time, // please take care of chunkIntervalsToRetain which should be unique per chunk. deleteRequestsToProcessMtx sync.Mutex metrics *deleteRequestsManagerMetrics wg sync.WaitGroup done chan struct{} + deletionMode Mode } -func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestsManager { +func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer, mode Mode) *DeleteRequestsManager { dm := &DeleteRequestsManager{ deleteRequestsStore: store, deleteRequestCancelPeriod: deleteRequestCancelPeriod, metrics: newDeleteRequestsManagerMetrics(registerer), done: make(chan struct{}), + deletionMode: mode, } go dm.loop() @@ -123,7 +125,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { return nil } -func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, []model.Interval) { +func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, []retention.IntervalFilter) { d.deleteRequestsToProcessMtx.Lock() defer d.deleteRequestsToProcessMtx.Unlock() @@ -132,20 +134,22 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) } d.chunkIntervalsToRetain = d.chunkIntervalsToRetain[:0] - d.chunkIntervalsToRetain = append(d.chunkIntervalsToRetain, model.Interval{ - Start: ref.From, - End: ref.Through, + d.chunkIntervalsToRetain = append(d.chunkIntervalsToRetain, retention.IntervalFilter{ + Interval: model.Interval{ + Start: ref.From, + End: ref.Through, + }, }) for _, deleteRequest := range d.deleteRequestsToProcess { - rebuiltIntervals := make([]model.Interval, 0, len(d.chunkIntervalsToRetain)) - for _, interval := range d.chunkIntervalsToRetain { + rebuiltIntervals := make([]retention.IntervalFilter, 0, len(d.chunkIntervalsToRetain)) + for _, ivf := range d.chunkIntervalsToRetain { entry := ref - entry.From = interval.Start - entry.Through = interval.End + entry.From = ivf.Interval.Start + entry.Through = ivf.Interval.End isDeleted, newIntervalsToRetain := deleteRequest.IsDeleted(entry) if !isDeleted { - rebuiltIntervals = append(rebuiltIntervals, interval) + rebuiltIntervals = append(rebuiltIntervals, ivf) } else { rebuiltIntervals = append(rebuiltIntervals, newIntervalsToRetain...) } @@ -158,7 +162,7 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) } } - if len(d.chunkIntervalsToRetain) == 1 && d.chunkIntervalsToRetain[0].Start == ref.From && d.chunkIntervalsToRetain[0].End == ref.Through { + if len(d.chunkIntervalsToRetain) == 1 && d.chunkIntervalsToRetain[0].Interval.Start == ref.From && d.chunkIntervalsToRetain[0].Interval.End == ref.Through { return false, nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go index 9e3f314580e2..6f5f95caf610 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go @@ -17,7 +17,7 @@ const testUserID = "test-user" func TestDeleteRequestsManager_Expired(t *testing.T) { type resp struct { isExpired bool - nonDeletedIntervals []model.Interval + nonDeletedIntervals []retention.IntervalFilter } now := model.Now() @@ -141,18 +141,24 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, expectedResp: resp{ isExpired: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-11*time.Hour) + 1, - End: now.Add(-10*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-11*time.Hour) + 1, + End: now.Add(-10*time.Hour) - 1, + }, }, { - Start: now.Add(-8*time.Hour) + 1, - End: now.Add(-6*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-8*time.Hour) + 1, + End: now.Add(-6*time.Hour) - 1, + }, }, { - Start: now.Add(-5*time.Hour) + 1, - End: now.Add(-2*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-5*time.Hour) + 1, + End: now.Add(-2*time.Hour) - 1, + }, }, }, }, @@ -207,12 +213,16 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, nil) + mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, nil, WholeStreamDeletion) require.NoError(t, mgr.loadDeleteRequestsToProcess()) isExpired, nonDeletedIntervals := mgr.Expired(chunkEntry, model.Now()) require.Equal(t, tc.expectedResp.isExpired, isExpired) - require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals) + for idx, interval := range nonDeletedIntervals { + require.Equal(t, tc.expectedResp.nonDeletedIntervals[idx].Interval.Start, interval.Interval.Start) + require.Equal(t, tc.expectedResp.nonDeletedIntervals[idx].Interval.End, interval.Interval.End) + require.NotNil(t, interval.Filter) + } }) } } diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation.go b/pkg/storage/stores/shipper/compactor/deletion/validation.go index 1aacfbfbb50d..68032e115fa0 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/validation.go +++ b/pkg/storage/stores/shipper/compactor/deletion/validation.go @@ -3,26 +3,19 @@ package deletion import ( "errors" - "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/pkg/logql/syntax" ) var ( - errInvalidQuery = errors.New("invalid query expression") - errUnsupportedQuery = errors.New("unsupported query expression") + errInvalidQuery = errors.New("invalid query expression") ) // parseDeletionQuery checks if the given logQL is valid for deletions -func parseDeletionQuery(query string) ([]*labels.Matcher, error) { - expr, err := syntax.ParseExpr(query) +func parseDeletionQuery(query string) (syntax.LogSelectorExpr, error) { + logSelectorExpr, err := syntax.ParseLogSelector(query, false) if err != nil { return nil, errInvalidQuery } - if matchersExpr, ok := expr.(*syntax.MatchersExpr); ok { - return matchersExpr.Matchers(), nil - } - - return nil, errUnsupportedQuery + return logSelectorExpr, nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go b/pkg/storage/stores/shipper/compactor/deletion/validation_test.go index 8a97fb72e47e..fa5486c0c830 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/validation_test.go @@ -8,32 +8,32 @@ import ( func TestParseLogQLExpressionForDeletion(t *testing.T) { t.Run("invalid logql", func(t *testing.T) { - matchers, err := parseDeletionQuery("gjgjg ggj") - require.Nil(t, matchers) + logSelectorExpr, err := parseDeletionQuery("gjgjg ggj") + require.Nil(t, logSelectorExpr) require.ErrorIs(t, err, errInvalidQuery) }) t.Run("matcher expression", func(t *testing.T) { - matchers, err := parseDeletionQuery(`{env="dev", secret="true"}`) - require.NotNil(t, matchers) + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"}`) + require.NotNil(t, logSelectorExpr) require.NoError(t, err) }) t.Run("pipeline expression with line filter", func(t *testing.T) { - matchers, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number"`) - require.Nil(t, matchers) - require.ErrorIs(t, err, errUnsupportedQuery) + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number"`) + require.NotNil(t, logSelectorExpr) + require.NoError(t, err) }) - t.Run("pipeline expression with label filter ", func(t *testing.T) { - matchers, err := parseDeletionQuery(`{env="dev", secret="true"} | json bob="top.params[0]"`) - require.Nil(t, matchers) - require.ErrorIs(t, err, errUnsupportedQuery) + t.Run("pipeline expression with multiple line filters", func(t *testing.T) { + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number" |~ "[abd]*" `) + require.NotNil(t, logSelectorExpr) + require.NoError(t, err) }) - t.Run("metrics query", func(t *testing.T) { - matchers, err := parseDeletionQuery(`count_over_time({job="mysql"}[5m])`) - require.Nil(t, matchers) - require.ErrorIs(t, err, errUnsupportedQuery) + t.Run("pipeline expression with invalid line filter", func(t *testing.T) { + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} |= social sec number`) + require.Nil(t, logSelectorExpr) + require.ErrorIs(t, err, errInvalidQuery) }) } diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index e9aee2744027..8f9617b03741 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -8,12 +8,21 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/util/filter" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" ) +// IntervalFilter contains the interval to delete +// and the function that filters lines. These will be +// applied to a chunk. +type IntervalFilter struct { + Interval model.Interval + Filter filter.Func +} + type ExpirationChecker interface { - Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) + Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool MarkPhaseStarted() MarkPhaseFailed() @@ -40,7 +49,7 @@ func NewExpirationChecker(limits Limits) ExpirationChecker { } // Expired tells if a ref chunk is expired based on retention rules. -func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { +func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { userID := unsafeGetString(ref.UserID) period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels) return now.Sub(ref.Through) > period, nil @@ -88,7 +97,7 @@ func NeverExpiringExpirationChecker(limits Limits) ExpirationChecker { type neverExpiringExpirationChecker struct{} -func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { +func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { return false, nil } func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go index e51a93cbe0ad..d7945ae924b1 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -81,9 +81,9 @@ func Test_expirationChecker_Expired(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual, nonDeletedIntervals := e.Expired(tt.ref, model.Now()) + actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now()) require.Equal(t, tt.want, actual) - require.Nil(t, nonDeletedIntervals) + require.Nil(t, nonDeletedIntervalFilters) }) } } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index f6a883b7cb0c..033bd4195fd7 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -151,11 +151,11 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr seriesMap.Add(c.SeriesID, c.UserID, c.Labels) // see if the chunk is deleted completely or partially - if expired, nonDeletedIntervals := expiration.Expired(c, now); expired { - if len(nonDeletedIntervals) > 0 { - wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals) + if expired, nonDeletedIntervalFilters := expiration.Expired(c, now); expired { + if len(nonDeletedIntervalFilters) > 0 { + wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervalFilters) if err != nil { - return false, false, fmt.Errorf("failed to rewrite chunk %s for interval %s with error %s", c.ChunkID, nonDeletedIntervals, err) + return false, false, fmt.Errorf("failed to rewrite chunk %s for intervals %+v with error %s", c.ChunkID, nonDeletedIntervalFilters, err) } if wroteChunks { @@ -173,7 +173,7 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then // the retention would fail because it would fail to find it in the storage. - if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End { + if len(nonDeletedIntervalFilters) == 0 || c.Through <= tableInterval.End { if err := marker.Put(c.ChunkID); err != nil { return false, false, err } @@ -307,7 +307,7 @@ func newChunkRewriter(chunkClient client.Client, schemaCfg config.PeriodConfig, }, nil } -func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, intervals []model.Interval) (bool, error) { +func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, intervalFilters []IntervalFilter) (bool, error) { userID := unsafeGetString(ce.UserID) chunkID := unsafeGetString(ce.ChunkID) @@ -327,9 +327,17 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva wroteChunks := false - for _, interval := range intervals { - newChunkData, err := chks[0].Data.Rebound(interval.Start, interval.End) + for _, ivf := range intervalFilters { + start := ivf.Interval.Start + end := ivf.Interval.End + + newChunkData, err := chks[0].Data.Rebound(start, end, ivf.Filter) if err != nil { + if errors.Is(err, chunk.ErrSliceNoDataInRange) { + level.Info(util_log.Logger).Log("msg", "Rebound leaves an empty chunk", "chunk ref", string(ce.ChunkRef.ChunkID)) + // skip empty chunks + continue + } return false, err } @@ -341,8 +349,8 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva newChunk := chunk.NewChunk( userID, chks[0].FingerprintModel(), chks[0].Metric, facade, - interval.Start, - interval.End, + start, + end, ) err = newChunk.Encode() @@ -350,7 +358,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva return false, err } - entries, err := c.seriesStoreSchema.GetChunkWriteEntries(interval.Start, interval.End, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk.ChunkRef)) + entries, err := c.seriesStoreSchema.GetChunkWriteEntries(newChunk.From, newChunk.Through, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk.ChunkRef)) if err != nil { return false, err } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index d973fc0a5b6d..62ff331ee22b 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -313,9 +313,9 @@ func TestChunkRewriter(t *testing.T) { minListMarkDelay = 1 * time.Second now := model.Now() for _, tt := range []struct { - name string - chunk chunk.Chunk - rewriteIntervals []model.Interval + name string + chunk chunk.Chunk + rewriteIntervalFilters []IntervalFilter }{ { name: "no rewrites", @@ -328,56 +328,87 @@ func TestChunkRewriter(t *testing.T) { { name: "rewrite first half", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-2 * time.Hour), - End: now.Add(-1 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-2 * time.Hour), + End: now.Add(-1 * time.Hour), + }, }, }, }, { name: "rewrite second half", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-time.Hour), - End: now, + Interval: model.Interval{ + Start: now.Add(-time.Hour), + End: now, + }, }, }, }, { name: "rewrite multiple intervals", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-12*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-12 * time.Hour), - End: now.Add(-10 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-12 * time.Hour), + End: now.Add(-10 * time.Hour), + }, }, { - Start: now.Add(-9 * time.Hour), - End: now.Add(-5 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-9 * time.Hour), + End: now.Add(-5 * time.Hour), + }, }, { - Start: now.Add(-2 * time.Hour), - End: now, + Interval: model.Interval{ + Start: now.Add(-2 * time.Hour), + End: now, + }, }, }, }, { name: "rewrite chunk spanning multiple days with multiple intervals", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-72*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-71 * time.Hour), - End: now.Add(-47 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-71 * time.Hour), + End: now.Add(-47 * time.Hour), + }, + }, + { + Interval: model.Interval{ + Start: now.Add(-40 * time.Hour), + End: now.Add(-30 * time.Hour), + }, }, { - Start: now.Add(-40 * time.Hour), - End: now.Add(-30 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-2 * time.Hour), + End: now, + }, }, + }, + }, + { + name: "remove no lines using a filter function", + chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-2 * time.Hour), - End: now, + Interval: model.Interval{ + Start: now.Add(-1 * time.Hour), + End: now, + }, + Filter: func(s string) bool { + return false + }, }, }, }, @@ -401,9 +432,9 @@ func TestChunkRewriter(t *testing.T) { cr, err := newChunkRewriter(chunkClient, store.schemaCfg.Configs[0], indexTable.name, bucket) require.NoError(t, err) - wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(store.schemaCfg, tt.chunk), tt.rewriteIntervals) + wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(store.schemaCfg, tt.chunk), tt.rewriteIntervalFilters) require.NoError(t, err) - if len(tt.rewriteIntervals) == 0 { + if len(tt.rewriteIntervalFilters) == 0 { require.False(t, wroteChunks) } return nil @@ -416,9 +447,9 @@ func TestChunkRewriter(t *testing.T) { chunks := store.GetChunks(tt.chunk.UserID, tt.chunk.From, tt.chunk.Through, tt.chunk.Metric) // number of chunks should be the new re-written chunks + the source chunk - require.Len(t, chunks, len(tt.rewriteIntervals)+1) - for _, interval := range tt.rewriteIntervals { - expectedChk := createChunk(t, tt.chunk.UserID, labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, interval.Start, interval.End) + require.Len(t, chunks, len(tt.rewriteIntervalFilters)+1) + for _, ivf := range tt.rewriteIntervalFilters { + expectedChk := createChunk(t, tt.chunk.UserID, labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, ivf.Interval.Start, ivf.Interval.End) for i, chk := range chunks { if store.schemaCfg.ExternalKey(chk.ChunkRef) == store.schemaCfg.ExternalKey(expectedChk.ChunkRef) { chunks = append(chunks[:i], chunks[i+1:]...) @@ -450,8 +481,8 @@ func (s *seriesCleanedRecorder) Cleanup(userID []byte, lbls labels.Labels) error } type chunkExpiry struct { - isExpired bool - nonDeletedIntervals []model.Interval + isExpired bool + nonDeletedIntervalFilters []IntervalFilter } type mockExpirationChecker struct { @@ -463,9 +494,9 @@ func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) mockExpiratio return mockExpirationChecker{chunksExpiry: chunksExpiry} } -func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { +func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { ce := m.chunksExpiry[string(ref.ChunkID)] - return ce.isExpired, ce.nonDeletedIntervals + return ce.isExpired, ce.nonDeletedIntervalFilters } func (m mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { @@ -534,9 +565,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry: []chunkExpiry{ { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start, - End: todaysTableInterval.Start.Add(15 * time.Minute), + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start, + End: todaysTableInterval.Start.Add(15 * time.Minute), + }, }}, }, }, @@ -586,9 +619,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { }, { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start, - End: todaysTableInterval.Start.Add(15 * time.Minute), + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start, + End: todaysTableInterval.Start.Add(15 * time.Minute), + }, }}, }, }, @@ -610,9 +645,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry: []chunkExpiry{ { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start, - End: now, + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start, + End: now, + }, }}, }, }, @@ -634,9 +671,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry: []chunkExpiry{ { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start.Add(-30 * time.Minute), - End: now, + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start.Add(-30 * time.Minute), + End: now, + }, }}, }, }, diff --git a/pkg/util/filter/filter_function.go b/pkg/util/filter/filter_function.go new file mode 100644 index 000000000000..58ba74fe8b7e --- /dev/null +++ b/pkg/util/filter/filter_function.go @@ -0,0 +1,3 @@ +package filter + +type Func func(string) bool