Skip to content

Commit

Permalink
wal full failure will cause flush on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Jan 7, 2021
1 parent 4ad6a57 commit b4fbae6
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 38 deletions.
29 changes: 15 additions & 14 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time,
require.Len(t, result.resps[0].Streams[1].Entries, ln)
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Expand All @@ -52,6 +47,18 @@ func TestIngesterWAL(t *testing.T) {
Recover: true,
CheckpointDuration: time.Second,
}

return ingesterConfig
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down Expand Up @@ -134,14 +141,8 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down
39 changes: 34 additions & 5 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package ingester

import (
"fmt"
"io/ioutil"
"os"
"sort"
"sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -44,7 +47,7 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
testData := pushTestSamples(t, ing)

Expand All @@ -54,7 +57,25 @@ func TestChunkFlushingIdle(t *testing.T) {
}

func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil)
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
}

type fullWAL struct{}

func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Stop() error { return nil }

func TestWALFullFlush(t *testing.T) {
// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
// so we enable creation/cleanup even though it remains unused.
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{})
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
Expand All @@ -66,7 +87,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

const userID = "testUser"
Expand Down Expand Up @@ -112,7 +133,7 @@ func TestFlushMaxAge(t *testing.T) {
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

now := time.Unix(0, 0)
Expand Down Expand Up @@ -166,7 +187,10 @@ type testStore struct {
chunks map[string][]chunk.Chunk
}

func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
// Note: the ingester New() function creates it's own WAL first which we then override if specified.
// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal.
// This is an ugly hook for testing :(
func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) {
store := &testStore{
chunks: map[string][]chunk.Chunk{},
}
Expand All @@ -178,6 +202,11 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

if walOverride != nil {
_ = ing.wal.Stop()
ing.wal = walOverride
}

return store, ing
}

Expand Down
29 changes: 19 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ type Ingester struct {

limiter *Limiter

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics

wal WAL
Expand All @@ -169,15 +173,16 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
metrics := newIngesterMetrics(registerer)

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
}

if cfg.WAL.Enabled {
Expand Down Expand Up @@ -319,6 +324,10 @@ func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs errUtil.MultiError
errs.Add(i.wal.Stop())

if i.flushOnShutdownSwitch.Get() {
i.lifecycler.SetFlushOnShutdown(true)
}
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))

// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
Expand Down Expand Up @@ -384,7 +393,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch)
i.instances[instanceID] = inst
}
return inst
Expand Down
38 changes: 36 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type instance struct {

wal WAL

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics
}

Expand All @@ -88,6 +92,7 @@ func newInstance(
limiter *Limiter,
wal WAL,
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
) *instance {
i := &instance{
cfg: cfg,
Expand All @@ -103,8 +108,9 @@ func newInstance(
tailers: map[uint32]*tailer{},
limiter: limiter,

wal: wal,
metrics: metrics,
wal: wal,
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand Down Expand Up @@ -165,6 +171,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
if err := i.wal.Log(record); err != nil {
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
i.metrics.walDiskFullFailures.Inc()
i.flushOnShutdownSwitch.Trigger()
} else {
return err
}
Expand Down Expand Up @@ -585,3 +592,30 @@ func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool {
}
return false
}

// OnceSwitch is a write optimized switch that can only ever be switched "on".
// It uses a RWMutex underneath the hood to quickly and effectively (in a concurrent environment)
// check if the switch has already been triggered, only actually acquiring the mutex for writing if not.
type OnceSwitch struct {
sync.RWMutex
toggle bool
}

func (o *OnceSwitch) Get() bool {
o.RLock()
defer o.RUnlock()
return o.toggle
}

func (o *OnceSwitch) Trigger() {
o.RLock()
if o.toggle {
o.RUnlock()
return
}

o.RUnlock()
o.Lock()
o.toggle = true
o.Unlock()
}
12 changes: 6 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{})

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -62,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})

const (
concurrent = 10
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
lbls := makeRandomLabels()

tt := time.Now()
Expand Down Expand Up @@ -160,7 +160,7 @@ func Test_SeriesQuery(t *testing.T) {
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20

instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics)
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})

currentTime := time.Now()

Expand Down Expand Up @@ -271,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
ctx := context.Background()

for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -313,7 +313,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {

ctx := context.Background()

inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T)
}, nil
}

_, ing := newTestStore(f.t, cfg)
_, ing := newTestStore(f.t, cfg, nil)
f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing

// NB there's some kind of race condition with the in-memory KV client when
Expand Down

0 comments on commit b4fbae6

Please sign in to comment.