From 8f1d12f656924eaf9bd887037c006728b22375cf Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 11 Jul 2024 16:43:52 +0100 Subject: [PATCH] feat: Use WAL Manager (#13491) --- pkg/ingester-rf1/flush.go | 39 ++++++++------- pkg/ingester-rf1/ingester.go | 95 ++++++++++-------------------------- pkg/ingester-rf1/instance.go | 32 +++++++++--- pkg/ingester-rf1/stream.go | 29 ++++++++--- 4 files changed, 93 insertions(+), 102 deletions(-) diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index 37b24f6f1abf..eda141afd762 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -4,15 +4,16 @@ import ( "crypto/rand" "fmt" "net/http" + "strconv" "time" + "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/runutil" "github.com/oklog/ulid" - "github.com/prometheus/common/model" "golang.org/x/net/context" "github.com/grafana/loki/v3/pkg/storage/wal" @@ -77,18 +78,16 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { } type flushOp struct { - from model.Time - userID string - fp model.Fingerprint - immediate bool + it *wal.PendingItem + num int64 } func (o *flushOp) Key() string { - return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate) + return strconv.Itoa(int(o.num)) } func (o *flushOp) Priority() int64 { - return -int64(o.from) + return -o.num } func (i *Ingester) flushLoop(j int) { @@ -103,29 +102,35 @@ func (i *Ingester) flushLoop(j int) { if o == nil { return } - op := o.(*flushCtx) + op := o.(*flushOp) + + start := time.Now() + + // We'll use this to log the size of the segment that was flushed. + n := humanize.Bytes(uint64(op.it.Writer.InputSize())) err := i.flushOp(l, op) + d := time.Since(start) if err != nil { - level.Error(l).Log("msg", "failed to flush", "err", err) - // Immediately re-queue another attempt at flushing this segment. - // TODO: Add some backoff or something? - i.flushQueues[j].Enqueue(op) + level.Error(l).Log("msg", "failed to flush", "size", n, "duration", d, "err", err) } else { - // Close the channel and trigger all waiting listeners to return - // TODO: Figure out how to return an error if we want to? - close(op.flushDone) + level.Debug(l).Log("msg", "flushed", "size", n, "duration", d) + } + + op.it.Result.SetDone(err) + if err = i.wal.Put(op.it); err != nil { + level.Error(l).Log("msg", "failed to put back in WAL Manager", "err", err) } } } -func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error { +func (i *Ingester) flushOp(l log.Logger, op *flushOp) error { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() b := backoff.New(ctx, i.cfg.FlushOpBackoff) for b.Ongoing() { - err := i.flushSegment(ctx, flushCtx.segmentWriter) + err := i.flushSegment(ctx, op.it.Writer) if err == nil { break } diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index d87159952e04..8d1b63f74e89 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -174,22 +174,6 @@ type Interface interface { PrepareShutdown(w http.ResponseWriter, r *http.Request) } -type flushCtx struct { - lock *sync.RWMutex - flushDone chan struct{} - newCtxAvailable chan struct{} - segmentWriter *wal.SegmentWriter - creationTime time.Time -} - -func (o *flushCtx) Key() string { - return fmt.Sprintf("%d", o.creationTime.UnixNano()) -} - -func (o *flushCtx) Priority() int64 { - return -o.creationTime.UnixNano() -} - // Ingester builds chunks for incoming log streams. type Ingester struct { services.Service @@ -217,10 +201,11 @@ type Ingester struct { // One queue per flush thread. Fingerprint is used to // pick a queue. + numOps int64 flushQueues []*util.PriorityQueue flushQueuesDone sync.WaitGroup - flushCtx *flushCtx + wal *wal.Manager limiter *Limiter @@ -268,7 +253,11 @@ func New(cfg Config, clientConfig client.Config, targetSizeStats.Set(int64(cfg.TargetChunkSize)) metrics := newIngesterMetrics(registerer, metricsNamespace) - segmentWriter, err := wal.NewWalSegmentWriter() + walManager, err := wal.NewManager(wal.Config{ + MaxAge: wal.DefaultMaxAge, + MaxSegments: wal.DefaultMaxSegments, + MaxSegmentSize: wal.DefaultMaxSegmentSize, + }) if err != nil { return nil, err } @@ -291,12 +280,7 @@ func New(cfg Config, clientConfig client.Config, writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"), customStreamsTracker: customStreamsTracker, readRing: readRing, - flushCtx: &flushCtx{ - lock: &sync.RWMutex{}, - flushDone: make(chan struct{}), - newCtxAvailable: make(chan struct{}), - segmentWriter: segmentWriter, - }, + wal: walManager, } // TODO: change flush on shutdown @@ -477,7 +461,6 @@ func (i *Ingester) running(ctx context.Context) error { func (i *Ingester) stopping(_ error) error { i.stopIncomingRequests() var errs util.MultiError - // errs.Add(i.wal.Stop()) //if i.flushOnShutdownSwitch.Get() { // i.lifecycler.SetFlushOnShutdown(true) @@ -567,30 +550,18 @@ func (i *Ingester) loop() { } func (i *Ingester) doFlushTick() { - i.flushCtx.lock.Lock() - - // i.logger.Log("msg", "starting periodic flush") - // Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used. - currentFlushCtx := i.flushCtx - - // APIs become unblocked after resetting flushCtx - segmentWriter, err := wal.NewWalSegmentWriter() - if err != nil { - // TODO: handle this properly - panic(err) - } - i.flushCtx = &flushCtx{ - lock: &sync.RWMutex{}, - flushDone: make(chan struct{}), - newCtxAvailable: make(chan struct{}), - segmentWriter: segmentWriter, - } - close(currentFlushCtx.newCtxAvailable) // Broadcast to all waiters that they can now fetch a new flushCtx. Small chance of a race but if they re-fetch the old one, they'll just check again immediately. - // Flush the finished context in the background & then notify watching API requests - // TODO: use multiple flush queues if required - // Don't write empty segments if there is nothing to write. - if currentFlushCtx.segmentWriter.InputSize() > 0 { - i.flushQueues[0].Enqueue(currentFlushCtx) + for { + // Keep adding ops to the queue until there are no more. + it, _ := i.wal.NextPending() + if it == nil { + break + } + i.numOps++ + flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes) + i.flushQueues[flushQueueIndex].Enqueue(&flushOp{ + num: i.numOps, + it: it, + }) } } @@ -796,27 +767,11 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro return &logproto.PushResponse{}, err } - // Fetch a flush context and try to acquire the RLock - // The only time the Write Lock is held is when this context is no longer usable and a new one is being created. - // In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available. - // The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop. - currentFlushCtx := i.flushCtx - for !currentFlushCtx.lock.TryRLock() { - select { - case <-currentFlushCtx.newCtxAvailable: - case <-ctx.Done(): - return &logproto.PushResponse{}, ctx.Err() - } - currentFlushCtx = i.flushCtx - } - err = instance.Push(ctx, req, currentFlushCtx) - currentFlushCtx.lock.RUnlock() - select { - case <-ctx.Done(): - return &logproto.PushResponse{}, ctx.Err() - case <-currentFlushCtx.flushDone: - return &logproto.PushResponse{}, err + if err = instance.Push(ctx, i.wal, req); err != nil { + return nil, err } + + return &logproto.PushResponse{}, nil } // GetStreamRates returns a response containing all streams and their current rate @@ -851,7 +806,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker) + inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker, i.logger) if err != nil { return nil, err } diff --git a/pkg/ingester-rf1/instance.go b/pkg/ingester-rf1/instance.go index 16a6758f4ec8..72f2f613a909 100644 --- a/pkg/ingester-rf1/instance.go +++ b/pkg/ingester-rf1/instance.go @@ -7,6 +7,7 @@ import ( "net/http" "sync" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/constants" util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/validation" @@ -70,6 +72,7 @@ type instance struct { // tailers map[uint32]*tailer tailerMtx sync.RWMutex + logger log.Logger limiter *Limiter streamCountLimiter *streamCountLimiter ownedStreamsSvc *ownedStreamService @@ -87,10 +90,10 @@ type instance struct { customStreamsTracker push.UsageTracker } -func (i *instance) Push(ctx context.Context, req *logproto.PushRequest, flushCtx *flushCtx) error { +func (i *instance) Push(ctx context.Context, w *wal.Manager, req *logproto.PushRequest) error { rateLimitWholeStream := i.limiter.limits.ShardStreams(i.instanceID).Enabled - var appendErr error + results := make([]*wal.AppendResult, 0, len(req.Streams)) for _, reqStream := range req.Streams { s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, func() (*stream, error) { @@ -102,13 +105,27 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest, flushCtx }, ) if err != nil { - appendErr = err - continue + return err } + _, res, err := s.Push(ctx, w, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker) + if err != nil { + return err + } + results = append(results, res) + } - _, appendErr = s.Push(ctx, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker, flushCtx) + for _, result := range results { + select { + case <-ctx.Done(): + return ctx.Err() + case <-result.Done(): + if err := result.Err(); err != nil { + return err + } + } } - return appendErr + + return nil } func newInstance( @@ -121,8 +138,8 @@ func newInstance( streamRateCalculator *StreamRateCalculator, writeFailures *writefailures.Manager, customStreamsTracker push.UsageTracker, + logger log.Logger, ) (*instance, error) { - fmt.Println("new instance for", instanceID) invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) if err != nil { return nil, err @@ -141,6 +158,7 @@ func newInstance( streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID), // //tailers: map[uint32]*tailer{}, + logger: logger, limiter: limiter, streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc), ownedStreamsSvc: ownedStreamsSvc, diff --git a/pkg/ingester-rf1/stream.go b/pkg/ingester-rf1/stream.go index 8bd7bdd0e329..44057694ccf3 100644 --- a/pkg/ingester-rf1/stream.go +++ b/pkg/ingester-rf1/stream.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/flagext" "github.com/grafana/loki/v3/pkg/validation" ) @@ -130,21 +131,24 @@ func (s *stream) consumeChunk(_ context.Context, _ *logproto.Chunk) error { func (s *stream) Push( ctx context.Context, + wal *wal.Manager, entries []logproto.Entry, // Whether nor not to ingest all at once or not. It is a per-tenant configuration. rateLimitWholeStream bool, usageTracker push.UsageTracker, - flushCtx *flushCtx, -) (int, error) { +) (int, *wal.AppendResult, error) { toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker) if rateLimitWholeStream && hasRateLimitErr(invalid) { - return 0, errorForFailedEntries(s, invalid, len(entries)) + return 0, nil, errorForFailedEntries(s, invalid, len(entries)) } - bytesAdded := s.storeEntries(ctx, toStore, usageTracker, flushCtx) + bytesAdded, res, err := s.storeEntries(ctx, wal, toStore, usageTracker) + if err != nil { + return 0, nil, err + } - return bytesAdded, errorForFailedEntries(s, invalid, len(entries)) + return bytesAdded, res, errorForFailedEntries(s, invalid, len(entries)) } func errorForFailedEntries(s *stream, failedEntriesWithError []entryWithError, totalEntries int) error { @@ -195,7 +199,7 @@ func hasRateLimitErr(errs []entryWithError) bool { return ok } -func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) int { +func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry, usageTracker push.UsageTracker) (int, *wal.AppendResult, error) { if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogKV("event", "stream started to store entries", "labels", s.labelsString) defer sp.LogKV("event", "stream finished to store entries") @@ -213,9 +217,18 @@ func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, us bytesAdded += len(entries[i].Line) } - flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, entries) + + res, err := w.Append(wal.AppendRequest{ + TenantID: s.tenant, + Labels: s.labels, + LabelsStr: s.labels.String(), + Entries: entries, + }) + if err != nil { + return 0, nil, err + } s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker) - return bytesAdded + return bytesAdded, res, nil } func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) {