Skip to content

Commit

Permalink
refactor(blooms): Add RPC service for bloom-planner (#13015)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored May 27, 2024
1 parent 4901a5c commit f6529c2
Show file tree
Hide file tree
Showing 9 changed files with 2,992 additions and 68 deletions.
7 changes: 7 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Metrics struct {
connectedBuilders prometheus.GaugeFunc
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
taskLost prometheus.Counter

buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
Expand Down Expand Up @@ -65,6 +66,12 @@ func NewMetrics(
MaxAge: time.Minute,
AgeBuckets: 6,
}),
taskLost: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tasks_lost_total",
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),

buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down
194 changes: 164 additions & 30 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand All @@ -22,6 +25,8 @@ import (
utillog "github.com/grafana/loki/v3/pkg/util/log"
)

var errPlannerIsNotRunning = errors.New("planner is not running")

type Planner struct {
services.Service
// Subservices manager.
Expand All @@ -38,6 +43,8 @@ type Planner struct {
tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

pendingTasks sync.Map

metrics *Metrics
logger log.Logger
}
Expand Down Expand Up @@ -92,13 +99,23 @@ func New(
return p, nil
}

func (p *Planner) starting(_ context.Context) (err error) {
func (p *Planner) starting(ctx context.Context) (err error) {
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
return fmt.Errorf("error starting planner subservices: %w", err)
}

p.metrics.running.Set(1)
return err
return nil
}

func (p *Planner) stopping(_ error) error {
p.metrics.running.Set(0)
defer p.metrics.running.Set(0)

// This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
if err := services.StopManagerAndAwaitStopped(context.Background(), p.subservices); err != nil {
return fmt.Errorf("error stopping planner subservices: %w", err)
}

return nil
}

Expand All @@ -108,20 +125,32 @@ func (p *Planner) running(ctx context.Context) error {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
}

ticker := time.NewTicker(p.cfg.PlanningInterval)
defer ticker.Stop()
planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()

inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()

for {
select {
case <-ctx.Done():
err := ctx.Err()
level.Debug(p.logger).Log("msg", "planner context done", "err", err)
return err
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
level.Error(p.logger).Log("msg", "planner context done with error", "err", err)
return err
}

case <-ticker.C:
level.Debug(p.logger).Log("msg", "planner context done")
return nil

case <-planningTicker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err)
}

case <-inflightTasksTicker.C:
inflight := p.totalPendingTasks()
p.metrics.inflightRequests.Observe(float64(inflight))
}
}
}
Expand Down Expand Up @@ -159,19 +188,13 @@ func (p *Planner) runOne(ctx context.Context) error {
now := time.Now()
for _, gap := range gaps {
totalTasks++
task := Task{
table: w.table.Addr(),
tenant: w.tenant,
OwnershipBounds: w.ownershipRange,
tsdb: gap.tsdb,
gaps: gap.gaps,

queueTime: now,
ctx: ctx,
}

p.activeUsers.UpdateUserTimestamp(task.tenant, now)
if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil {
task := NewTask(
ctx, now,
protos.NewTask(w.table.Addr(), w.tenant, w.ownershipRange, gap.tsdb, gap.gaps),
)

if err := p.enqueueTask(task); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
}
Expand Down Expand Up @@ -326,7 +349,7 @@ func (p *Planner) findGapsForBounds(
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []GapWithBlocks
gaps []protos.GapWithBlocks
}

func (p *Planner) findOutdatedGaps(
Expand Down Expand Up @@ -420,12 +443,12 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]GapWithBlocks, 0, len(idx.gaps)),
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := GapWithBlocks{
bounds: gap,
planGap := protos.GapWithBlocks{
Bounds: gap,
}

for _, meta := range metas {
Expand All @@ -442,18 +465,18 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.blocks = append(planGap.blocks, block)
planGap.Blocks = append(planGap.Blocks, block)
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.blocks, func(i, j int) bool {
return planGap.blocks[i].Bounds.Less(planGap.blocks[j].Bounds)
sort.Slice(planGap.Blocks, func(i, j int) bool {
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})

peekingBlocks := v1.NewPeekingIter[bloomshipper.BlockRef](
v1.NewSliceIter[bloomshipper.BlockRef](
planGap.blocks,
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
Expand All @@ -472,7 +495,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
planGap.blocks = deduped
planGap.Blocks = deduped

plan.gaps = append(plan.gaps, planGap)
}
Expand All @@ -482,3 +505,114 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan

return plans, nil
}

func (p *Planner) addPendingTask(task *Task) {
p.pendingTasks.Store(task.ID, task)
}

func (p *Planner) removePendingTask(task *Task) {
p.pendingTasks.Delete(task.ID)
}

func (p *Planner) totalPendingTasks() (total int) {
p.pendingTasks.Range(func(_, _ interface{}) bool {
total++
return true
})
return total
}

func (p *Planner) enqueueTask(task *Task) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
p.addPendingTask(task)
})
}

func (p *Planner) NotifyBuilderShutdown(
_ context.Context,
req *protos.NotifyBuilderShutdownRequest,
) (*protos.NotifyBuilderShutdownResponse, error) {
level.Debug(p.logger).Log("msg", "builder shutdown", "builder", req.BuilderID)
p.tasksQueue.UnregisterConsumerConnection(req.GetBuilderID())

return &protos.NotifyBuilderShutdownResponse{}, nil
}

func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer) error {
resp, err := builder.Recv()
if err != nil {
return fmt.Errorf("error receiving message from builder: %w", err)
}

builderID := resp.GetBuilderID()
logger := log.With(p.logger, "builder", builderID)
level.Debug(logger).Log("msg", "builder connected")

p.tasksQueue.RegisterConsumerConnection(builderID)
defer p.tasksQueue.UnregisterConsumerConnection(builderID)

lastIndex := queue.StartIndex
for p.isRunningOrStopping() {
item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
if err != nil {
return fmt.Errorf("error dequeuing task: %w", err)
}
lastIndex = idx

if item == nil {

return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}
task := item.(*Task)

queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())

if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.removePendingTask(task)
continue
}

if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil {
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
}

return fmt.Errorf("error forwarding task to builder (%s). Task requeued: %w", builderID, err)
}

}

return errPlannerIsNotRunning
}

func (p *Planner) forwardTaskToBuilder(
builder protos.PlannerForBuilder_BuilderLoopServer,
builderID string,
task *Task,
) error {
defer p.removePendingTask(task)

msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
}

if err := builder.Send(msg); err != nil {
return fmt.Errorf("error sending task to builder (%s): %w", builderID, err)
}

// TODO(salvacorts): Implement timeout and retry for builder response.
_, err := builder.Recv()

return err
}

func (p *Planner) isRunningOrStopping() bool {
st := p.State()
return st == services.Running || st == services.Stopping
}
Loading

0 comments on commit f6529c2

Please sign in to comment.