diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index eda141afd762..8d008716d2fe 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -118,9 +118,7 @@ func (i *Ingester) flushLoop(j int) { } op.it.Result.SetDone(err) - if err = i.wal.Put(op.it); err != nil { - level.Error(l).Log("msg", "failed to put back in WAL Manager", "err", err) - } + i.wal.Put(op.it) } } diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index 581da7c8a438..5057a8652f10 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -552,7 +552,7 @@ func (i *Ingester) loop() { func (i *Ingester) doFlushTick() { for { // Keep adding ops to the queue until there are no more. - it, _ := i.wal.NextPending() + it := i.wal.NextPending() if it == nil { break } diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index ed16f73f31af..2ec28a4e5038 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -212,7 +212,7 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { // NextPending returns the next segment to be flushed. It returns nil if the // pending list is empty. -func (m *Manager) NextPending() (*PendingItem, error) { +func (m *Manager) NextPending() *PendingItem { m.mu.Lock() defer m.mu.Unlock() if m.pending.Len() == 0 { @@ -230,7 +230,7 @@ func (m *Manager) NextPending() (*PendingItem, error) { } // If the pending list is still empty return nil. if m.pending.Len() == 0 { - return nil, nil + return nil } } el := m.pending.Front() @@ -238,12 +238,12 @@ func (m *Manager) NextPending() (*PendingItem, error) { m.pending.Remove(el) m.metrics.NumPending.Dec() m.metrics.NumFlushing.Inc() - return &PendingItem{Result: it.r, Writer: it.w}, nil + return &PendingItem{Result: it.r, Writer: it.w} } // Put resets the segment and puts it back in the available list to accept // writes. A PendingItem should not be put back until it has been flushed. -func (m *Manager) Put(it *PendingItem) error { +func (m *Manager) Put(it *PendingItem) { m.mu.Lock() defer m.mu.Unlock() it.Writer.Reset() @@ -253,5 +253,4 @@ func (m *Manager) Put(it *PendingItem) error { r: &AppendResult{done: make(chan struct{})}, w: it.Writer, }) - return nil } diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index 461cc05f1243..bd120e2fe8cc 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -47,8 +47,7 @@ func TestManager_Append(t *testing.T) { } // Flush the data and broadcast that the flush is successful. - it, err := m.NextPending() - require.NoError(t, err) + it := m.NextPending() require.NotNil(t, it) it.Result.SetDone(nil) @@ -61,7 +60,7 @@ func TestManager_Append(t *testing.T) { require.NoError(t, res.Err()) // Return the segment to be written to again. - require.NoError(t, m.Put(it)) + m.Put(it) // Append some more data. entries = []*logproto.Entry{{ @@ -78,8 +77,7 @@ func TestManager_Append(t *testing.T) { require.NotNil(t, res) // Flush the data, but this time broadcast an error that the flush failed. - it, err = m.NextPending() - require.NoError(t, err) + it = m.NextPending() require.NotNil(t, it) it.Result.SetDone(errors.New("failed to flush")) @@ -146,8 +144,7 @@ func TestManager_NextPending(t *testing.T) { require.NoError(t, err) // There should be no items as no data has been written. - it, err := m.NextPending() - require.NoError(t, err) + it := m.NextPending() require.Nil(t, it) // Append 512B of data. There should still be no items to as the segment is @@ -167,8 +164,7 @@ func TestManager_NextPending(t *testing.T) { Entries: entries, }) require.NoError(t, err) - it, err = m.NextPending() - require.NoError(t, err) + it = m.NextPending() require.Nil(t, it) // Write another 512B of data. There should be an item waiting to be flushed. @@ -183,13 +179,11 @@ func TestManager_NextPending(t *testing.T) { Entries: entries, }) require.NoError(t, err) - it, err = m.NextPending() - require.NoError(t, err) + it = m.NextPending() require.NotNil(t, it) // Should not get the same item more than once. - it, err = m.NextPending() - require.NoError(t, err) + it = m.NextPending() require.Nil(t, it) } @@ -227,8 +221,7 @@ func TestManager_Put(t *testing.T) { require.Equal(t, 1, m.pending.Len()) // Getting the pending segment should remove it from the list. - it, err := m.NextPending() - require.NoError(t, err) + it := m.NextPending() require.NotNil(t, it) require.Equal(t, 9, m.available.Len()) require.Equal(t, 0, m.pending.Len()) @@ -237,7 +230,7 @@ func TestManager_Put(t *testing.T) { require.Equal(t, int64(1024), it.Writer.InputSize()) // Putting it back should add it to the available list. - require.NoError(t, m.Put(it)) + m.Put(it) require.Equal(t, 10, m.available.Len()) require.Equal(t, 0, m.pending.Len()) @@ -297,8 +290,7 @@ wal_segments_pending 1 require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) // Get the segment from the pending list. - it, err := m.NextPending() - require.NoError(t, err) + it := m.NextPending() require.NotNil(t, it) expected = ` # HELP wal_segments_available The number of WAL segments accepting writes. @@ -314,7 +306,7 @@ wal_segments_pending 0 require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) // Reset the segment and put it back in the available list. - require.NoError(t, m.Put(it)) + m.Put(it) expected = ` # HELP wal_segments_available The number of WAL segments accepting writes. # TYPE wal_segments_available gauge