Skip to content

Commit

Permalink
chore(blooms): Some boom gateway cleanups (#13165)
Browse files Browse the repository at this point in the history
* Cleanup logic for processing multiple days
  in single `FilterChunkRefs` request in the bloom gateway. The handler
  already returned an error if the requested chunk refs spanned across
  multiple days, but the logic for processing multiple days/tasks was
  still in place. This commit simplifies the logic to only process a
  single task per request.

* Remove unused package global variable

* Remove bloomgateway package comment

* Remove duplicate imports

* Avoid conversion from *logproto.ShortRef to v1.ChunkRef
  and istead cast the value of the pointer

* Simplify `getFromThrough()` function

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored Jun 10, 2024
1 parent 9767807 commit d4fcef5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 211 deletions.
160 changes: 37 additions & 123 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,7 @@
/*
Bloom Gateway package
The bloom gateway is a component that can be run as a standalone microserivce
target and provides capabilities for filtering ChunkRefs based on a given list
of line filter expressions.
Querier Query Frontend
| |
................................... service boundary
| |
+----+------+
|
indexgateway.Gateway
|
bloomgateway.BloomQuerier
|
bloomgateway.GatewayClient
|
logproto.BloomGatewayClient
|
................................... service boundary
|
bloomgateway.Gateway
|
queue.RequestQueue
|
bloomgateway.Worker
|
bloomgateway.Processor
|
bloomshipper.Store
|
bloomshipper.Client
|
ObjectClient
|
................................... service boundary
|
object storage
*/
package bloomgateway

Expand All @@ -63,13 +27,10 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")

const (
metricsSubsystem = "bloom_gateway"
querierMetricsSubsystem = "bloom_gateway_querier"
Expand Down Expand Up @@ -209,7 +170,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
stats, ctx := ContextWithEmptyStats(ctx)
logger := spanlogger.FromContextWithFallback(
ctx,
util_log.WithContext(ctx, g.logger),
utillog.WithContext(ctx, g.logger),
)

defer func() {
Expand Down Expand Up @@ -261,9 +222,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

// TODO(chaudum): I intentionally keep the logic for handling multiple tasks,
// so that the PR does not explode in size. This should be cleaned up at some point.

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)

Expand All @@ -279,14 +237,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("request time range must span exactly one day")
}

tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task := newTask(ctx, tenantID, seriesForDay, filters, blocks)
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))
tasks = append(tasks, task)
}
series := seriesByDay[0]
task := newTask(ctx, tenantID, series, filters, blocks)

// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(series.series))
// free up the responses
defer responsesPool.Put(task.responses)

g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())

Expand All @@ -297,62 +254,41 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
preFilterChunks += len(series.Refs)
}

// Ideally we could use an unbuffered channel here, but since we return the
// request on the first error, there can be cases where the request context
// is not done yet and the consumeTask() function wants to send to the
// tasksCh, but nobody reads from it any more.
queueStart := time.Now()
tasksCh := make(chan Task, len(tasks))
for _, task := range tasks {
task := task
task.enqueueTime = time.Now()

// TODO(owen-d): gracefully handle full queues
if err := g.queue.Enqueue(tenantID, nil, task, func() {
// When enqueuing, we also add the task to the pending tasks
_ = g.pendingTasks.Inc()
}); err != nil {
stats.Status = labelFailure
return nil, errors.Wrap(err, "failed to enqueue task")
}
// TODO(owen-d): use `concurrency` lib, bound parallelism
go g.consumeTask(ctx, task, tasksCh)
}

sp.LogKV("msg", "enqueued tasks", "duration", time.Since(queueStart).String())
tasksCh := make(chan Task, 1)

remaining := len(tasks)
// TODO(owen-d): gracefully handle full queues
task.enqueueTime = time.Now()
if err := g.queue.Enqueue(tenantID, nil, task, func() {
// When enqueuing, we also add the task to the pending tasks
_ = g.pendingTasks.Inc()
}); err != nil {
stats.Status = labelFailure
return nil, errors.Wrap(err, "failed to enqueue task")
}
// TODO(owen-d): use `concurrency` lib, bound parallelism
go g.consumeTask(ctx, task, tasksCh)

combinedRecorder := v1.NewBloomRecorder(ctx, "combined")
for remaining > 0 {
select {
case <-ctx.Done():
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses)
combinedRecorder.Merge(task.recorder)
remaining--

select {
case <-ctx.Done():
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task = <-tasksCh:
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
}
combinedRecorder.Merge(task.recorder)
}

combinedRecorder.Report(util_log.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics())
sp.LogKV("msg", "received all responses")
combinedRecorder.Report(utillog.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics())

start := time.Now()
filtered := filterChunkRefs(req, responses)
filtered := filterChunkRefs(req, task.responses)
duration := time.Since(start)
stats.AddPostProcessingTime(duration)

// free up the responses
for _, resp := range responses {
responsesPool.Put(resp)
}

var postFilterSeries, postFilterChunks int
postFilterSeries = len(filtered)
for _, group := range filtered {
Expand Down Expand Up @@ -404,35 +340,13 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
}
}

// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses.
// Individual responses do not need to be be ordered beforehand.
func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
if len(responses) == 0 {
return v1.NewEmptyIter[v1.Output]()
}
if len(responses) == 1 {
sort.Slice(responses[0], func(i, j int) bool { return responses[0][i].Fp < responses[0][j].Fp })
return v1.NewSliceIter(responses[0])
}

itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses))
for _, r := range responses {
sort.Slice(r, func(i, j int) bool { return r[i].Fp < r[j].Fp })
itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r)))
}
return v1.NewHeapIterator[v1.Output](
func(o1, o2 v1.Output) bool { return o1.Fp < o2.Fp },
itrs...,
)
}

// TODO(owen-d): improve perf. This can be faster with a more specialized impl
// NB(owen-d): `req` is mutated in place for performance, but `responses` is not
// Removals of the outputs must be sorted.
func filterChunkRefs(
req *logproto.FilterChunkRefRequest,
responses [][]v1.Output,
) []*logproto.GroupedChunkRefs {
func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses []v1.Output) []*logproto.GroupedChunkRefs {
// sort responses by fingerprint
sort.Slice(responses, func(i, j int) bool { return responses[i].Fp < responses[j].Fp })

res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))

// dedupe outputs, merging the same series.
Expand Down Expand Up @@ -481,7 +395,7 @@ func filterChunkRefs(
res.Removals = chks
return res
},
v1.NewPeekingIter(orderedResponsesByFP(responses)),
v1.NewPeekingIter(v1.NewSliceIter(responses)),
)

// Iterate through the requested and filtered series/chunks,
Expand Down
Loading

0 comments on commit d4fcef5

Please sign in to comment.