Skip to content

Commit

Permalink
chore: Remove un-used errors from WAL Manager (#13496)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 11, 2024
1 parent aa55c69 commit 583f7f3
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 28 deletions.
4 changes: 1 addition & 3 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ func (i *Ingester) flushLoop(j int) {
}

op.it.Result.SetDone(err)
if err = i.wal.Put(op.it); err != nil {
level.Error(l).Log("msg", "failed to put back in WAL Manager", "err", err)
}
i.wal.Put(op.it)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (i *Ingester) loop() {
func (i *Ingester) doFlushTick() {
for {
// Keep adding ops to the queue until there are no more.
it, _ := i.wal.NextPending()
it := i.wal.NextPending()
if it == nil {
break
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {

// NextPending returns the next segment to be flushed. It returns nil if the
// pending list is empty.
func (m *Manager) NextPending() (*PendingItem, error) {
func (m *Manager) NextPending() *PendingItem {
m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 {
Expand All @@ -230,20 +230,20 @@ func (m *Manager) NextPending() (*PendingItem, error) {
}
// If the pending list is still empty return nil.
if m.pending.Len() == 0 {
return nil, nil
return nil
}
}
el := m.pending.Front()
it := el.Value.(*item)
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingItem{Result: it.r, Writer: it.w}, nil
return &PendingItem{Result: it.r, Writer: it.w}
}

// Put resets the segment and puts it back in the available list to accept
// writes. A PendingItem should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingItem) error {
func (m *Manager) Put(it *PendingItem) {
m.mu.Lock()
defer m.mu.Unlock()
it.Writer.Reset()
Expand All @@ -253,5 +253,4 @@ func (m *Manager) Put(it *PendingItem) error {
r: &AppendResult{done: make(chan struct{})},
w: it.Writer,
})
return nil
}
30 changes: 11 additions & 19 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func TestManager_Append(t *testing.T) {
}

// Flush the data and broadcast that the flush is successful.
it, err := m.NextPending()
require.NoError(t, err)
it := m.NextPending()
require.NotNil(t, it)
it.Result.SetDone(nil)

Expand All @@ -61,7 +60,7 @@ func TestManager_Append(t *testing.T) {
require.NoError(t, res.Err())

// Return the segment to be written to again.
require.NoError(t, m.Put(it))
m.Put(it)

// Append some more data.
entries = []*logproto.Entry{{
Expand All @@ -78,8 +77,7 @@ func TestManager_Append(t *testing.T) {
require.NotNil(t, res)

// Flush the data, but this time broadcast an error that the flush failed.
it, err = m.NextPending()
require.NoError(t, err)
it = m.NextPending()
require.NotNil(t, it)
it.Result.SetDone(errors.New("failed to flush"))

Expand Down Expand Up @@ -146,8 +144,7 @@ func TestManager_NextPending(t *testing.T) {
require.NoError(t, err)

// There should be no items as no data has been written.
it, err := m.NextPending()
require.NoError(t, err)
it := m.NextPending()
require.Nil(t, it)

// Append 512B of data. There should still be no items to as the segment is
Expand All @@ -167,8 +164,7 @@ func TestManager_NextPending(t *testing.T) {
Entries: entries,
})
require.NoError(t, err)
it, err = m.NextPending()
require.NoError(t, err)
it = m.NextPending()
require.Nil(t, it)

// Write another 512B of data. There should be an item waiting to be flushed.
Expand All @@ -183,13 +179,11 @@ func TestManager_NextPending(t *testing.T) {
Entries: entries,
})
require.NoError(t, err)
it, err = m.NextPending()
require.NoError(t, err)
it = m.NextPending()
require.NotNil(t, it)

// Should not get the same item more than once.
it, err = m.NextPending()
require.NoError(t, err)
it = m.NextPending()
require.Nil(t, it)
}

Expand Down Expand Up @@ -227,8 +221,7 @@ func TestManager_Put(t *testing.T) {
require.Equal(t, 1, m.pending.Len())

// Getting the pending segment should remove it from the list.
it, err := m.NextPending()
require.NoError(t, err)
it := m.NextPending()
require.NotNil(t, it)
require.Equal(t, 9, m.available.Len())
require.Equal(t, 0, m.pending.Len())
Expand All @@ -237,7 +230,7 @@ func TestManager_Put(t *testing.T) {
require.Equal(t, int64(1024), it.Writer.InputSize())

// Putting it back should add it to the available list.
require.NoError(t, m.Put(it))
m.Put(it)
require.Equal(t, 10, m.available.Len())
require.Equal(t, 0, m.pending.Len())

Expand Down Expand Up @@ -297,8 +290,7 @@ wal_segments_pending 1
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Get the segment from the pending list.
it, err := m.NextPending()
require.NoError(t, err)
it := m.NextPending()
require.NotNil(t, it)
expected = `
# HELP wal_segments_available The number of WAL segments accepting writes.
Expand All @@ -314,7 +306,7 @@ wal_segments_pending 0
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Reset the segment and put it back in the available list.
require.NoError(t, m.Put(it))
m.Put(it)
expected = `
# HELP wal_segments_available The number of WAL segments accepting writes.
# TYPE wal_segments_available gauge
Expand Down

0 comments on commit 583f7f3

Please sign in to comment.