Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update S3 retry logic to account for the underlying cause in case of IOException #15238

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public boolean apply(Throwable e)
if (e == null) {
return false;
} else if (e instanceof IOException) {
if (e.getCause() != null) {
// Recurse with the underlying cause to see if it's retriable.
return apply(e.getCause());
}
return true;
} else if (e instanceof SdkClientException
&& e.getMessage().contains("Data read has a different length than the expected")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
}

@Test
public void testGZUncompressRetries() throws IOException, SegmentLoadingException
public void testGZUncompressOn4xxError() throws IOException
{
final String bucket = "bucket";
final String keyPrefix = "prefix/dir/0";
Expand Down Expand Up @@ -165,6 +165,65 @@
AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest");
exception.setErrorCode("NoSuchKey");
exception.setStatusCode(404);
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(true)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andThrow(exception)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);

EasyMock.replay(s3Client);
Assert.assertThrows(
SegmentLoadingException.class,
() -> puller.getSegmentFiles(
new CloudObjectLocation(
bucket,
object0.getKey()
), tmpDir
)
);
EasyMock.verify(s3Client);

File expected = new File(tmpDir, "renames-0");
Assert.assertFalse(expected.exists());
}

@Test
public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingException
{
final String bucket = "bucket";
final String keyPrefix = "prefix/dir/0";
final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
final byte[] value = bucket.getBytes(StandardCharsets.UTF_8);

final File tmpFile = temporaryFolder.newFile("gzTest.gz");

try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) {

Check warning

Code scanning / CodeQL

Potential output resource leak Warning test

This FileOutputStream is not always closed on method exit.
outputStream.write(value);
}

S3Object object0 = new S3Object();

object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.getObjectMetadata().setLastModified(new Date(0));
object0.setObjectContent(new FileInputStream(tmpFile));

Check warning

Code scanning / CodeQL

Potential input resource leak Warning test

This FileInputStream is not always closed on method exit.

final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));

final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
listObjectsResult.setKeyCount(1);
listObjectsResult.getObjectSummaries().add(objectSummary);

File tmpDir = temporaryFolder.newFolder("gzTestDir");

AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest");
exception.setErrorCode("Slow Down");
exception.setStatusCode(503);
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(true)
.once();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.s3;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

public class S3UtilsTest
{
@Test
public void testRetryWithIOExceptions()
{
final int maxRetries = 3;
final AtomicInteger count = new AtomicInteger();
Assert.assertThrows(
IOException.class,
() -> S3Utils.retryS3Operation(
() -> {
count.incrementAndGet();
throw new IOException("hmm");
},
maxRetries
));
Assert.assertEquals(maxRetries, count.get());
}

@Test
public void testRetryWith4XXErrors()
{
final AtomicInteger count = new AtomicInteger();
Assert.assertThrows(
IOException.class,
() -> S3Utils.retryS3Operation(
() -> {
if (count.incrementAndGet() >= 2) {
return "hey";
} else {
AmazonS3Exception s3Exception = new AmazonS3Exception("a 403 s3 exception");
s3Exception.setStatusCode(403);
throw new IOException(s3Exception);
}
},
3
));
Assert.assertEquals(1, count.get());
}

@Test
public void testRetryWith5XXErrorsNotExceedingMaxRetries() throws Exception
{
final int maxRetries = 3;
final AtomicInteger count = new AtomicInteger();
S3Utils.retryS3Operation(
() -> {
if (count.incrementAndGet() >= maxRetries) {
return "hey";
} else {
AmazonS3Exception s3Exception = new AmazonS3Exception("a 5xx s3 exception");
s3Exception.setStatusCode(500);
throw new IOException(s3Exception);
}
},
maxRetries
);
Assert.assertEquals(maxRetries, count.get());
}

@Test
public void testRetryWith5XXErrorsExceedingMaxRetries()
{
final int maxRetries = 3;
final AtomicInteger count = new AtomicInteger();
Assert.assertThrows(
IOException.class,
() -> S3Utils.retryS3Operation(
() -> {
if (count.incrementAndGet() > maxRetries) {
return "hey";
} else {
AmazonS3Exception s3Exception = new AmazonS3Exception("a 5xx s3 exception");
s3Exception.setStatusCode(500);
throw new IOException(s3Exception);
}
},
maxRetries
)
);
Assert.assertEquals(maxRetries, count.get());
}
}