From 73cbbb0f2257b9eb5a3bf5d2cf1f4d4d2490d47d Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Thu, 5 Sep 2024 15:53:22 -0400 Subject: [PATCH] feat: add retries for s3 ObjectExists calls (#14062) **What this PR does / why we need it**: It was determined via careful examination that Loki would issue retries for S3 `GetObject` calls, but would not retry calls to `ObjectExists`. For transient issues (e.g. rate-limiting), it makes sense to retry the command to see if an object does exist. If the object is not found (meaning, a 404 return code, and a successful query of the S3 storage to show the object does not exist), the retries are not leveraged, as it is not needed. --- docs/sources/shared/configuration.md | 4 +- .../chunk/client/aws/s3_storage_client.go | 38 +++++-- .../client/aws/s3_storage_client_test.go | 106 ++++++++++++++++++ 3 files changed, 136 insertions(+), 12 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 75b3e85749e7..171845994227 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1562,7 +1562,7 @@ backoff_config: # CLI flag: -s3.max-backoff [max_period: | default = 3s] - # Maximum number of times to retry when s3 get Object + # Maximum number of times to retry for s3 GetObject or ObjectExists # CLI flag: -s3.max-retries [max_retries: | default = 5] @@ -5430,7 +5430,7 @@ backoff_config: # CLI flag: -.storage.s3.max-backoff [max_period: | default = 3s] - # Maximum number of times to retry when s3 get Object + # Maximum number of times to retry for s3 GetObject or ObjectExists # CLI flag: -.storage.s3.max-retries [max_retries: | default = 5] diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index 11696f67eddb..4785d4667bd3 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -124,7 +124,7 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.BackoffConfig.MinBackoff, prefix+"s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object") f.DurationVar(&cfg.BackoffConfig.MaxBackoff, prefix+"s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object") - f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+"s3.max-retries", 5, "Maximum number of times to retry when s3 get Object") + f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+"s3.max-retries", 5, "Maximum number of times to retry for s3 GetObject or ObjectExists") } // Validate config and returns error on failure @@ -307,16 +307,34 @@ func buckets(cfg S3Config) ([]string, error) { func (a *S3ObjectClient) Stop() {} func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { - err := instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - headObjectInput := &s3.HeadObjectInput{ - Bucket: aws.String(a.bucketFromKey(objectKey)), - Key: aws.String(objectKey), + var lastErr error + + retries := backoff.New(ctx, a.cfg.BackoffConfig) + for retries.Ongoing() { + if ctx.Err() != nil { + return false, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists") } - _, err := a.S3.HeadObject(headObjectInput) - return err - }) - if err != nil { - return false, err + lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + headObjectInput := &s3.HeadObjectInput{ + Bucket: aws.String(a.bucketFromKey(objectKey)), + Key: aws.String(objectKey), + } + _, requestErr := a.S3.HeadObject(headObjectInput) + return requestErr + }) + if lastErr == nil { + return true, nil + } + + if a.IsObjectNotFoundErr(lastErr) { + return false, lastErr + } + + retries.Wait() + } + + if lastErr != nil { + return false, lastErr } return true, nil diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index d966f1a2f9f9..1cf020e7b9ec 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -196,6 +196,112 @@ func Test_Hedging(t *testing.T) { } } +type MockS3Client struct { + s3.S3 + HeadObjectFunc func(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error) +} + +func (m *MockS3Client) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + return m.HeadObjectFunc(input) +} + +func Test_RetryLogic(t *testing.T) { + for _, tc := range []struct { + name string + maxRetries int + exists bool + do func(c *S3ObjectClient) error + }{ + { + "get object with retries", + 3, + true, + func(c *S3ObjectClient) error { + _, _, err := c.GetObject(context.Background(), "foo") + return err + }, + }, + { + "object exists with retries", + 3, + true, + func(c *S3ObjectClient) error { + _, err := c.ObjectExists(context.Background(), "foo") + return err + }, + }, + { + "object doesn't exist with retries", + 3, + false, + func(c *S3ObjectClient) error { + _, err := c.ObjectExists(context.Background(), "foo") + return err + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + callCount := atomic.NewInt32(0) + + mockS3 := &MockS3Client{ + HeadObjectFunc: func(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + callNum := callCount.Inc() + if !tc.exists { + rfIn := awserr.NewRequestFailure( + awserr.New("NotFound", "Not Found", nil), 404, "abc", + ) + return nil, rfIn + } + + // Fail the first set of calls + if int(callNum) <= tc.maxRetries-1 { + time.Sleep(200 * time.Millisecond) // Simulate latency + return nil, errors.New("simulated error on mock call") + } + + // Succeed on the last call + return &s3.HeadObjectOutput{}, nil + }, + } + + c, err := NewS3ObjectClient(S3Config{ + AccessKeyID: "foo", + SecretAccessKey: flagext.SecretWithValue("bar"), + BackoffConfig: backoff.Config{MaxRetries: tc.maxRetries}, + BucketNames: "foo", + Inject: func(next http.RoundTripper) http.RoundTripper { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + // Increment the call counter + callNum := callCount.Inc() + + // Fail the first set of calls + if int(callNum) <= tc.maxRetries-1 { + time.Sleep(200 * time.Millisecond) // Simulate latency + return nil, errors.New("simulated error on call") + } + + // Succeed on the last call + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte("object content"))), + }, nil + }) + }, + }, hedging.Config{}) + require.NoError(t, err) + c.S3 = mockS3 + err = tc.do(c) + if tc.exists { + require.NoError(t, err) + require.Equal(t, tc.maxRetries, int(callCount.Load())) + } else { + require.True(t, c.IsObjectNotFoundErr(err)) + require.Equal(t, 1, int(callCount.Load())) + } + }) + } +} + func Test_ConfigRedactsCredentials(t *testing.T) { underTest := S3Config{ AccessKeyID: "access key id",