Skip to content

Commit

Permalink
[libbeat] Add configurable exponential backoff for disk queue write e…
Browse files Browse the repository at this point in the history
…rrors (#21493)
  • Loading branch information
faec committed Oct 2, 2020
1 parent bcb8da0 commit b0236ee
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 16 deletions.
52 changes: 47 additions & 5 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
Expand Down Expand Up @@ -61,16 +62,26 @@ type Settings struct {
// A listener that should be sent ACKs when an event is successfully
// written to disk.
WriteToDiskListener queue.ACKListener

// RetryInterval specifies how long to wait before retrying a fatal error
// writing to disk. If MaxRetryInterval is nonzero, subsequent retries will
// use exponential backoff up to the specified limit.
RetryInterval time.Duration
MaxRetryInterval time.Duration
}

// userConfig holds the parameters for a disk queue that are configurable
// by the end user in the beats yml file.
type userConfig struct {
Path string `config:"path"`
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
ReadAheadLimit *int `config:"read_ahead"`
WriteAheadLimit *int `config:"write_ahead"`
Path string `config:"path"`
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
SegmentSize *cfgtype.ByteSize `config:"segment_size"`

ReadAheadLimit *int `config:"read_ahead"`
WriteAheadLimit *int `config:"write_ahead"`

RetryInterval *time.Duration `config:"retry_interval" validate:"positive"`
MaxRetryInterval *time.Duration `config:"max_retry_interval" validate:"positive"`
}

func (c *userConfig) Validate() error {
Expand All @@ -96,6 +107,13 @@ func (c *userConfig) Validate() error {
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
}

if c.RetryInterval != nil && c.MaxRetryInterval != nil &&
*c.MaxRetryInterval < *c.RetryInterval {
return fmt.Errorf(
"Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
*c.MaxRetryInterval, *c.RetryInterval)
}

return nil
}

Expand All @@ -108,6 +126,9 @@ func DefaultSettings() Settings {

ReadAheadLimit: 512,
WriteAheadLimit: 2048,

RetryInterval: 1 * time.Second,
MaxRetryInterval: 30 * time.Second,
}
}

Expand Down Expand Up @@ -137,6 +158,13 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) {
settings.WriteAheadLimit = *userConfig.WriteAheadLimit
}

if userConfig.RetryInterval != nil {
settings.RetryInterval = *userConfig.RetryInterval
}
if userConfig.MaxRetryInterval != nil {
settings.MaxRetryInterval = *userConfig.RetryInterval
}

return settings, nil
}

Expand Down Expand Up @@ -164,3 +192,17 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
func (settings Settings) maxSegmentOffset() segmentOffset {
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
}

// Given a retry interval, nextRetryInterval returns the next higher level
// of backoff.
func (settings Settings) nextRetryInterval(
currentInterval time.Duration,
) time.Duration {
if settings.MaxRetryInterval > 0 {
currentInterval *= 2
if currentInterval > settings.MaxRetryInterval {
currentInterval = settings.MaxRetryInterval
}
}
return currentInterval
}
9 changes: 7 additions & 2 deletions libbeat/publisher/queue/diskqueue/deleter_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func newDeleterLoop(settings Settings) *deleterLoop {
}

func (dl *deleterLoop) run() {
currentRetryInterval := dl.settings.RetryInterval
for {
request, ok := <-dl.requestChan
if !ok {
Expand Down Expand Up @@ -87,10 +88,14 @@ func (dl *deleterLoop) run() {
// The delay can be interrupted if the request channel is closed,
// indicating queue shutdown.
select {
// TODO: make the retry interval configurable.
case <-time.After(time.Second):
case <-time.After(currentRetryInterval):
case <-dl.requestChan:
}
currentRetryInterval =
dl.settings.nextRetryInterval(currentRetryInterval)
} else {
// If we made progress, reset the retry interval.
currentRetryInterval = dl.settings.RetryInterval
}
dl.responseChan <- deleterLoopResponse{
results: results,
Expand Down
9 changes: 7 additions & 2 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,15 @@ func (segment *queueSegment) getWriter(
// retry callback returns true. This is used for timed retries when
// creating a queue segment from the writer loop.
func (segment *queueSegment) getWriterWithRetry(
queueSettings Settings, retry func(error) bool,
queueSettings Settings, retry func(err error, firstTime bool) bool,
) (*os.File, error) {
firstTime := true
file, err := segment.getWriter(queueSettings)
for err != nil && retry(err) {
for err != nil && retry(err, firstTime) {
// Set firstTime to false so the retry callback can perform backoff
// etc if needed.
firstTime = false

// Try again
file, err = segment.getWriter(queueSettings)
}
Expand Down
22 changes: 19 additions & 3 deletions libbeat/publisher/queue/diskqueue/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,32 @@ func writeErrorIsRetriable(err error) bool {
// "wrapped" field in-place as long as it isn't captured by the callback.
type callbackRetryWriter struct {
wrapped io.Writer
retry func(error) bool

// The retry callback is called with the error that was produced and whether
// this is the first (subsequent) error arising from this particular
// write call.
retry func(err error, firstTime bool) bool
}

func (w callbackRetryWriter) Write(p []byte) (int, error) {
// firstTime tracks whether the current error is the first subsequent error
// being passed to the retry callback. This is so that the callback can
// reset its internal counters in case it is using exponential backoff or
// a retry limit.
firstTime := true
bytesWritten := 0
writer := w.wrapped
n, err := writer.Write(p)
for n < len(p) {
if err != nil && !w.retry(err) {
return bytesWritten + n, err
if err != nil {
shouldRetry := w.retry(err, firstTime)
firstTime = false
if !shouldRetry {
return bytesWritten + n, err
}
} else {
// If we made progress without an error, reset firstTime.
firstTime = true
}
// Advance p and try again.
bytesWritten += n
Expand Down
28 changes: 24 additions & 4 deletions libbeat/publisher/queue/diskqueue/writer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type writerLoop struct {
// The file handle corresponding to currentSegment. When currentSegment
// changes, this handle is closed and a new one is created.
outputFile *os.File

currentRetryInterval time.Duration
}

func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {
Expand All @@ -91,6 +93,8 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {

requestChan: make(chan writerLoopRequest, 1),
responseChan: make(chan writerLoopResponse),

currentRetryInterval: settings.RetryInterval,
}
}

Expand Down Expand Up @@ -215,23 +219,39 @@ outerLoop:
return append(bytesWritten, curBytesWritten)
}

// retryCallback is called (by way of retryCallbackWriter) when there is
func (wl *writerLoop) applyRetryBackoff() {
wl.currentRetryInterval =
wl.settings.nextRetryInterval(wl.currentRetryInterval)
}

func (wl *writerLoop) resetRetryBackoff() {
wl.currentRetryInterval = wl.settings.RetryInterval
}

// retryCallback is called (by way of callbackRetryWriter) when there is
// an error writing to a segment file. It pauses for a configurable
// interval and returns true if the operation should be retried (which
// it always should, unless the queue is being closed).
func (wl *writerLoop) retryCallback(err error) bool {
func (wl *writerLoop) retryCallback(err error, firstTime bool) bool {
if firstTime {
// Reset any exponential backoff in the retry interval.
wl.resetRetryBackoff()
}
if writeErrorIsRetriable(err) {
return true
}
// If this error isn't immediately retriable, increase the exponential
// backoff afterwards.
defer wl.applyRetryBackoff()

// If the error is not immediately retriable, log the error
// and wait for the retry interval before trying again, but
// abort if the queue is closed (indicated by the request channel
// becoming unblocked).
wl.logger.Errorf("Writing to segment %v: %v",
wl.currentSegment.id, err)
select {
case <-time.After(time.Second):
// TODO: use a configurable interval here
case <-time.After(wl.currentRetryInterval):
return true
case <-wl.requestChan:
return false
Expand Down

0 comments on commit b0236ee

Please sign in to comment.