Skip to content

Commit

Permalink
Adds chunk filter hook for ingesters. (#3603)
Browse files Browse the repository at this point in the history
* Adds chunk filter hook for ingesters.

Follow up on #3569 this is the missing piece.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* That interface is not required to change.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Apr 14, 2021
1 parent 7bac502 commit 72dcda1
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 26 deletions.
3 changes: 2 additions & 1 deletion pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"bytes"
"context"
fmt "fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -206,7 +207,7 @@ func newStreamsIterator(ing ingesterInstances) *streamIterator {
inst.streamsMtx.RLock()
streams := make([]*stream, 0, len(inst.streams))
inst.streamsMtx.RUnlock()
_ = inst.forAllStreams(func(s *stream) error {
_ = inst.forAllStreams(context.Background(), func(s *stream) error {
streams = append(streams, s)
return nil
})
Expand Down
7 changes: 2 additions & 5 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func TestUnflushedChunks(t *testing.T) {
}

func TestIngesterWALBackpressureSegments(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
Expand Down Expand Up @@ -287,7 +286,6 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}

func TestIngesterWALBackpressureCheckpoint(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
Expand Down Expand Up @@ -353,7 +351,6 @@ func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Du
return
}
}

}

// mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams
Expand Down Expand Up @@ -456,7 +453,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
Expand Down Expand Up @@ -506,7 +503,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil, nil)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/runtime"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -340,6 +341,8 @@ func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig {

func (s *testStore) Stop() {}

func (s *testStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
userIDs := []string{"1", "2", "3"}

Expand Down
10 changes: 8 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ type Ingester struct {
metrics *ingesterMetrics

wal WAL

chunkFilter storage.RequestChunkFilterer
}

// ChunkStore is the interface we need to store chunks.
Expand Down Expand Up @@ -220,6 +222,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
return i, nil
}

func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}

func (i *Ingester) starting(ctx context.Context) error {
if i.cfg.WAL.Enabled {
// Ignore retain period during wal replay.
Expand Down Expand Up @@ -404,7 +410,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
i.instances[instanceID] = inst
}
return inst
Expand Down Expand Up @@ -677,7 +683,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
return err
}

if err := instance.addNewTailer(tailer); err != nil {
if err := instance.addNewTailer(queryServer.Context(), tailer); err != nil {
return err
}
tailer.loop()
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -284,6 +285,9 @@ func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
return nil
}

func (s *mockStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {
}

type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
Expand Down Expand Up @@ -448,7 +452,6 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
}

func TestValidate(t *testing.T) {

for i, tc := range []struct {
in Config
err bool
Expand Down
46 changes: 37 additions & 9 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -89,9 +90,11 @@ type instance struct {
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics

chunkFilter storage.RequestChunkFilterer
}

func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch) *instance {
func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch, chunkFilter storage.RequestChunkFilterer) *instance {
i := &instance{
cfg: cfg,
streams: map[string]*stream{},
Expand All @@ -110,6 +113,8 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt
wal: wal,
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,

chunkFilter: chunkFilter,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand Down Expand Up @@ -295,7 +300,9 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter

ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator

err = i.forMatchingStreams(
ctx,
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(ctx, ingStats, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels))
Expand Down Expand Up @@ -326,6 +333,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
ingStats := stats.GetIngesterData(ctx)
var iters []iter.SampleIterator
err = i.forMatchingStreams(
ctx,
expr.Selector().Matchers(),
func(stream *stream) error {
iter, err := stream.SampleIterator(ctx, ingStats, req.Start, req.End, extractor.ForStream(stream.labels))
Expand Down Expand Up @@ -363,7 +371,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro
}, nil
}

func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
groups, err := loghttp.Match(req.GetGroups())
if err != nil {
return nil, err
Expand All @@ -374,7 +382,7 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
// If no matchers were supplied we include all streams.
if len(groups) == 0 {
series = make([]logproto.SeriesIdentifier, 0, len(i.streams))
err = i.forAllStreams(func(stream *stream) error {
err = i.forAllStreams(ctx, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
series = append(series, logproto.SeriesIdentifier{
Expand All @@ -389,7 +397,7 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
} else {
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
err = i.forMatchingStreams(ctx, matchers, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
// exit early when this stream was added by an earlier group
Expand Down Expand Up @@ -426,11 +434,18 @@ func (i *instance) numStreams() int {

// forAllStreams will execute a function for all streams in the instance.
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forAllStreams(fn func(*stream) error) error {
func (i *instance) forAllStreams(ctx context.Context, fn func(*stream) error) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()
var chunkFilter storage.ChunkFilterer
if i.chunkFilter != nil {
chunkFilter = i.chunkFilter.ForRequest(ctx)
}

for _, stream := range i.streams {
if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
err := fn(stream)
if err != nil {
return err
Expand All @@ -442,6 +457,7 @@ func (i *instance) forAllStreams(fn func(*stream) error) error {
// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
ctx context.Context,
matchers []*labels.Matcher,
fn func(*stream) error,
) error {
Expand All @@ -450,7 +466,10 @@ func (i *instance) forMatchingStreams(

filters, matchers := cutil.SplitFiltersAndMatchers(matchers)
ids := i.index.Lookup(matchers)

var chunkFilter storage.ChunkFilterer
if i.chunkFilter != nil {
chunkFilter = i.chunkFilter.ForRequest(ctx)
}
outer:
for _, streamID := range ids {
stream, ok := i.streamsByFP[streamID]
Expand All @@ -462,7 +481,9 @@ outer:
continue outer
}
}

if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
err := fn(stream)
if err != nil {
return err
Expand All @@ -471,8 +492,8 @@ outer:
return nil
}

func (i *instance) addNewTailer(t *tailer) error {
if err := i.forMatchingStreams(t.matchers, func(s *stream) error {
func (i *instance) addNewTailer(ctx context.Context, t *tailer) error {
if err := i.forMatchingStreams(ctx, t.matchers, func(s *stream) error {
s.addTailer(t)
return nil
}); err != nil {
Expand All @@ -494,8 +515,15 @@ func (i *instance) addTailersToNewStream(stream *stream) {
if t.isClosed() {
continue
}
var chunkFilter storage.ChunkFilterer
if i.chunkFilter != nil {
chunkFilter = i.chunkFilter.ForRequest(t.conn.Context())
}

if isMatching(stream.labels, t.matchers) {
if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
stream.addTailer(t)
}
}
Expand Down
Loading

0 comments on commit 72dcda1

Please sign in to comment.