Skip to content

Commit

Permalink
Add filter parameter to rebound so lines can be deleted by the compac…
Browse files Browse the repository at this point in the history
…tor (#5879)

* Add filter parameter to rebound

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix linting issues

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add filter function to delete request

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix linting issues

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Enable api for filter and delete mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add settings for retention

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use labels to check and add test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Simplify filter function

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Also enable filter mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove test settings in config file for docker

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add extra (unused) param for ProcessString in filter

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Empty commit to trigger CI again

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix flapping test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove commented out unit tests and add some more

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add extra test case for delete request without line filter

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use chunk bounds

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* check if the log selector has a filter if the whole chunk is selected

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* fix lint issue: use correct go-kit import

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
  • Loading branch information
MichelHollands authored Apr 29, 2022
1 parent 8c03d23 commit 477a0e7
Show file tree
Hide file tree
Showing 21 changed files with 508 additions and 155 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main
* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters.
* [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode.
* [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9.
* [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config
* [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist.
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/filter"
)

const (
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *dumbChunk) Close() error {
return nil
}

func (c *dumbChunk) Rebound(start, end time.Time) (Chunk, error) {
func (c *dumbChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) {
return nil, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/filter"
)

// GzipLogChunk is a cortex encoding type for our chunks.
Expand Down Expand Up @@ -86,8 +87,8 @@ func (f Facade) LokiChunk() Chunk {
return f.c
}

func (f Facade) Rebound(start, end model.Time) (chunk.Data, error) {
newChunk, err := f.c.Rebound(start.Time(), end.Time())
func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error) {
newChunk, err := f.c.Rebound(start.Time(), end.Time(), filter)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/filter"
)

// Errors returned by the chunk interface.
Expand Down Expand Up @@ -127,7 +128,7 @@ type Chunk interface {
CompressedSize() int
Close() error
Encoding() Encoding
Rebound(start, end time.Time) (Chunk, error)
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

// Block is a chunk block.
Expand Down
8 changes: 6 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -713,7 +714,7 @@ func (c *MemChunk) reorder() error {

// Otherwise, we need to rebuild the blocks
from, to := c.Bounds()
newC, err := c.Rebound(from, to)
newC, err := c.Rebound(from, to, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -910,7 +911,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {
}

// Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {
func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) {
// add a millisecond to end time because the Chunk.Iterator considers end time to be non-inclusive.
itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
if err != nil {
Expand All @@ -931,6 +932,9 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {

for itr.Next() {
entry := itr.Entry()
if filter != nil && filter(entry.Line) {
continue
}
if err := newChunk.Append(&entry); err != nil {
return nil, err
}
Expand Down
96 changes: 95 additions & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ func TestMemChunk_Rebound(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo)
newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo, nil)
if tc.err != nil {
require.Equal(t, tc.err, err)
return
Expand Down Expand Up @@ -1231,3 +1231,97 @@ func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {

return chk
}

func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
chkFrom := time.Unix(1, 0) // headBlock.Append treats Unix time 0 as not set so we have to use a later time
chkFromPlus5 := chkFrom.Add(5 * time.Second)
chkThrough := chkFrom.Add(10 * time.Second)
chkThroughPlus1 := chkThrough.Add(1 * time.Second)

filterFunc := func(in string) bool {
return strings.HasPrefix(in, "matching")
}

for _, tc := range []struct {
name string
matchingSliceFrom, matchingSliceTo *time.Time
err error
nrMatching int
nrNotMatching int
}{
{
name: "no matches",
nrMatching: 0,
nrNotMatching: 10,
},
{
name: "some lines removed",
matchingSliceFrom: &chkFrom,
matchingSliceTo: &chkFromPlus5,
nrMatching: 5,
nrNotMatching: 5,
},
{
name: "all lines match",
err: chunk.ErrSliceNoDataInRange,
matchingSliceFrom: &chkFrom,
matchingSliceTo: &chkThroughPlus1,
},
} {
t.Run(tc.name, func(t *testing.T) {
originalChunk := buildFilterableTestMemChunk(t, chkFrom, chkThrough, tc.matchingSliceFrom, tc.matchingSliceTo)
newChunk, err := originalChunk.Rebound(chkFrom, chkThrough, filterFunc)
if tc.err != nil {
require.Equal(t, tc.err, err)
return
}
require.NoError(t, err)

// iterate originalChunk from slice start to slice end + nanosecond. Adding a nanosecond here to be inclusive of sample at end time.
originalChunkItr, err := originalChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)
originalChunkSamples := 0
for originalChunkItr.Next() {
originalChunkSamples++
}
require.Equal(t, tc.nrMatching+tc.nrNotMatching, originalChunkSamples)

// iterate newChunk for whole chunk interval which should include all the samples in the chunk and hence align it with expected values.
newChunkItr, err := newChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)
newChunkSamples := 0
for newChunkItr.Next() {
newChunkSamples++
}
require.Equal(t, tc.nrNotMatching, newChunkSamples)
})
}
}

func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time) *MemChunk {
chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String())
for from.Before(through) {
// If a line is between matchingFrom and matchingTo add the prefix "matching"
if matchingFrom != nil && matchingTo != nil &&
(from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) {
t.Logf("%v matching line", from.String())
err := chk.Append(&logproto.Entry{
Line: fmt.Sprintf("matching %v", from.String()),
Timestamp: from,
})
require.NoError(t, err)
} else {
t.Logf("%v non-match line", from.String())
err := chk.Append(&logproto.Entry{
Line: from.String(),
Timestamp: from,
})
require.NoError(t, err)
}
from = from.Add(time.Second)
}

return chk
}
15 changes: 9 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,12 +795,15 @@ func (t *Loki) initCompactor() (services.Service, error) {

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)

if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() != deletion.Disabled {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))

t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
if t.Cfg.CompactorConfig.RetentionEnabled {
switch t.compactor.DeleteMode() {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
default:
break
}
}

return t.compactor, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/chunk/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/grafana/loki/pkg/util/filter"
)

const samplesPerChunk = 120
Expand Down Expand Up @@ -85,7 +87,7 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
return nil
}

func (b *bigchunk) Rebound(start, end model.Time) (Data, error) {
func (b *bigchunk) Rebound(start, end model.Time, filter filter.Func) (Data, error) {
return nil, errors.New("not implemented")
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
errs "github.com/weaveworks/common/errors"

"github.com/grafana/loki/pkg/util/filter"
)

const (
Expand All @@ -47,7 +49,7 @@ type Data interface {
// Rebound returns a smaller chunk that includes all samples between start and end (inclusive).
// We do not want to change existing Slice implementations because
// it is built specifically for query optimization and is a noop for some of the encodings.
Rebound(start, end model.Time) (Data, error)
Rebound(start, end model.Time, filter filter.Func) (Data, error)
// Size returns the approximate length of the chunk in bytes.
Size() int
Utilization() float64
Expand Down
14 changes: 10 additions & 4 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,23 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem
return err
}

if c.deleteMode != deletion.Disabled {
switch c.deleteMode {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(
c.deleteRequestsStore,
c.cfg.DeleteRequestCancelPeriod,
r,
c.deleteMode,
)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
} else {
default:
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
Expand Down Expand Up @@ -567,7 +573,7 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio
return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker}
}

func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) {
func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []retention.IntervalFilter) {
if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired {
return expired, nonDeletedIntervals
}
Expand Down
Loading

0 comments on commit 477a0e7

Please sign in to comment.