Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL backpressure #3218

Merged
merged 15 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ lifecycler:
[query_store_max_look_back_period: <duration> | default = 0]


# The ingester WAL records incoming logs and stores them on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash.
# The ingester WAL (Write Ahead Log) records incoming logs and stores them on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash.
wal:
# Enables writing to WAL.
# CLI flag: -ingester.wal-enabled
Expand All @@ -911,13 +911,17 @@ wal:
# CLI flag: -ingester.recover-from-wal
[recover: <boolean> | default = false]

# When WAL is enabled, should chunks be flushed to long-term storage on shutdown.
# CLI flag: -ingester.flush-on-shutdown
[flush_on_shutdown: <boolean> | default = false]
# When WAL is enabled, should chunks be flushed to long-term storage on shutdown.
# CLI flag: -ingester.flush-on-shutdown
[flush_on_shutdown: <boolean> | default = false]

# Interval at which checkpoints should be created.
# CLI flag: ingester.checkpoint-duration
[checkpoint_duration: <duration> | default = 5m]
# Interval at which checkpoints should be created.
# CLI flag: ingester.checkpoint-duration
[checkpoint_duration: <duration> | default = 5m]

# Maximum memory size the WAL may use during replay. After hitting this it will flush data to storage before continuing.
owen-d marked this conversation as resolved.
Show resolved Hide resolved
# A unit suffix (KB, MB, GB) may be applied.
[replay_memory_ceiling: <string> | default = 4GB]
```

## consul_config
Expand Down
8 changes: 7 additions & 1 deletion docs/sources/operations/storage/wal.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ In the event the underlying WAL disk is full, Loki will not fail incoming writes

Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens.


### Backpressure

The WAL also includes a backpressure mechanism to allow a large WAL to be replayed within a smaller memory bound. This is helpful after bad scenarios (i.e. an outage) when a WAL has grown past the point it may be recovered in memory. In this case, the ingester will track the amount of data being replayed and once it's passed the `ingester.wal-replay-memory-ceiling` threshold, will flush to storage. When this happens, it's likely that Loki's attempt to deduplicate chunks via content addressable storage will suffer. We deemed this efficiency loss an acceptable tradeoff considering how it simplifies operation and that it should not occur during regular operation (rollouts, rescheduling) where the WAL can be replayed without triggering this threshold.

### Metrics

## Changes to deployment
Expand All @@ -38,6 +43,7 @@ Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be
* `--ingester.checkpoint-duration` to the interval at which checkpoints should be created.
* `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.
* If you are going to enable WAL, it is advisable to always set this to `true`.
* `--ingester.wal-replay-memory-ceiling` (default 4GB) may be set higher/lower depending on your resource settings. It handles memory pressure during WAL replays, allowing a WAL many times larger than available memory to be replayed. This is provided to minimize reconciliation time after very bad situations, i.e. an outage, and will likely not impact regular operations/rollouts _at all_. We suggest setting this to a high percentage (~75%) of available memory.

## Changes in lifecycle when WAL is enabled

Expand Down Expand Up @@ -78,7 +84,7 @@ When scaling down, we must ensure existing data on the leaving ingesters are flu

Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`.

Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/flush_shutdown`](../../api#post-ingesterflush_shutdown) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring).
Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/flush_shutdown`](../../api#post-ingesterflush_shutdown) endpoint. This will flush the chunks and remove itself from the ring, after which it will register as unready and may be deleted.

After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2.

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
blocks: []block{},

head: &headBlock{},
format: chunkFormatV2,
format: chunkFormatV3,

encoding: enc,
}
Expand Down
7 changes: 0 additions & 7 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,13 +920,6 @@ func TestCheckpointEncoding(t *testing.T) {
cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize)
require.Nil(t, err)

// TODO(owen-d): remove once v3+ is the default chunk version
// because that is when we started serializing uncompressed size.
// Until then, nil them out in order to ease equality testing.
for i := range c.blocks {
c.blocks[i].uncompressedSize = 0
}

require.Equal(t, c, cpy)
}

Expand Down
163 changes: 147 additions & 16 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time,
func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
ingesterConfig.WAL.Enabled = true
ingesterConfig.WAL.Dir = walDir
ingesterConfig.WAL.Recover = true
ingesterConfig.WAL.CheckpointDuration = time.Second

return ingesterConfig
}
Expand Down Expand Up @@ -113,7 +111,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// ensure we haven't checkpointed yet
expectCheckpoint(t, walDir, false)
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
Expand All @@ -124,9 +122,8 @@ func TestIngesterWAL(t *testing.T) {
// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)

time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer
// ensure we have checkpointed now
expectCheckpoint(t, walDir, true)
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

Expand Down Expand Up @@ -244,17 +241,151 @@ func TestUnflushedChunks(t *testing.T) {
require.Equal(t, 1, len(unflushedChunks(chks)))
}

func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) {
fs, err := ioutil.ReadDir(walDir)
func TestIngesterWALBackpressureSegments(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
var found bool
for _, f := range fs {
if _, err := checkpointIndex(f.Name(), false); err == nil {
found = true
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
ingesterConfig.WAL.ReplayMemoryCeiling = 1000

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

require.True(t, found == shouldExist)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

start := time.Now()
// Replay data 5x larger than the ceiling.
totalSize := int(5 * i.cfg.WAL.ReplayMemoryCeiling)
req, written := mkPush(start, totalSize)
require.Equal(t, totalSize, written)

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, req)
require.NoError(t, err)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// ensure we haven't checkpointed yet
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
}

func TestIngesterWALBackpressureCheckpoint(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
ingesterConfig.WAL.ReplayMemoryCeiling = 1000

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

start := time.Now()
// Replay data 5x larger than the ceiling.
totalSize := int(5 * i.cfg.WAL.ReplayMemoryCeiling)
req, written := mkPush(start, totalSize)
require.Equal(t, totalSize, written)

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, req)
require.NoError(t, err)

// ensure we have checkpointed now
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
}

func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Duration) {
deadline := time.After(max)
for {
select {
case <-deadline:
require.Fail(t, "timeout while waiting for checkpoint existence:", shouldExist)
default:
<-time.After(max / 10) // check 10x over the duration
}

fs, err := ioutil.ReadDir(walDir)
require.Nil(t, err)
var found bool
for _, f := range fs {
if _, err := checkpointIndex(f.Name(), false); err == nil {
found = true
}
}
if found == shouldExist {
return
}
}

}

// mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams
func mkPush(start time.Time, totalSize int) (*logproto.PushRequest, int) {
var written int
req := &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
},
}
totalStreams := 500
if totalStreams > totalSize {
totalStreams = totalSize
}

for i := 0; i < totalStreams; i++ {
req.Streams = append(req.Streams, logproto.Stream{
Labels: fmt.Sprintf(`{foo="bar",i="%d"}`, i),
})

for j := 0; j < totalSize/totalStreams; j++ {
req.Streams[i].Entries = append(req.Streams[i].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(j) * time.Nanosecond),
Line: string([]byte{1}),
})
written++
}

}
return req, written
}

type ingesterInstancesFunc func() []*instance
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func Test_Encoding_Series(t *testing.T) {

err := decodeWALRecord(buf, decoded)
require.Nil(t, err)

// Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices.
// Both are valid, so check length.
require.Equal(t, 0, len(decoded.RefEntries))
decoded.RefEntries = nil
require.Equal(t, record, decoded)
}

Expand Down
36 changes: 28 additions & 8 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,38 @@ const (
flushReasonSynced = "synced"
)

// Note: this is called both during the WAL replay (zero or more times)
// and then after replay as well.
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
go i.flushLoop(j)
}
}

// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
i.sweepUsers(true)
i.flush(true)
}

func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)

// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

i.flushQueuesDone.Wait()

}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
i.sweepUsers(true, true)
w.WriteHeader(http.StatusNoContent)
}

Expand All @@ -143,21 +158,21 @@ func (o *flushOp) Priority() int64 {
}

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate bool) {
func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) {
instances := i.getInstances()

for _, instance := range instances {
i.sweepInstance(instance, immediate)
i.sweepInstance(instance, immediate, mayRemoveStreams)
}
}

func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

for _, stream := range instance.streams {
i.sweepStream(instance, stream, immediate)
i.removeFlushedChunks(instance, stream)
i.removeFlushedChunks(instance, stream, mayRemoveStreams)
}
}

Expand Down Expand Up @@ -287,23 +302,28 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) {
}

// must hold streamsMtx
func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) {
now := time.Now()

stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
prevNumChunks := len(stream.chunks)
var subtracted int
for len(stream.chunks) > 0 {
if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {
break
}

subtracted += stream.chunks[0].chunk.UncompressedSize()
stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected
stream.chunks = stream.chunks[1:]
}
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))

if len(stream.chunks) == 0 {
// Signal how much data has been flushed to lessen any WAL replay pressure.
i.replayController.Sub(int64(subtracted))

if mayRemoveStream && len(stream.chunks) == 0 {
delete(instance.streamsByFP, stream.fp)
delete(instance.streams, stream.labelsString)
instance.index.Delete(stream.labels, stream.fp)
Expand Down
Loading