Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 11, 2024
1 parent 09b7c68 commit eee4a69
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionLi
S3AsyncDeleteHelper.executeDeleteChain(s3AsyncClient, blobStore, keysToDelete, CompletableFuture.completedFuture(null), null)
.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
completionListener.onFailure(new IOException("Failed to delete blobs " + blobNames, throwable));
} else {
completionListener.onResponse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -119,6 +121,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class S3BlobStoreContainerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -1379,6 +1382,187 @@ public void onFailure(Exception e) {
verify(s3AsyncClient, times(expectedDeletedObjectsCall)).deleteObjects(any(DeleteObjectsRequest.class));
}

public void testDeleteAsyncFailure() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

// Simulate a failure in listObjectsV2Paginator
RuntimeException simulatedFailure = new RuntimeException("Simulated failure");
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenThrow(simulatedFailure);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
blobContainer.deleteAsync(new ActionListener<>() {
@Override
public void onResponse(DeleteResult deleteResult) {
fail("Expected a failure, but got a success response");
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertNotNull(exceptionRef.get());
assertEquals(IOException.class, exceptionRef.get().getClass());
assertEquals("Failed to initiate async deletion", exceptionRef.get().getMessage());
assertEquals(simulatedFailure, exceptionRef.get().getCause());
}

public void testDeleteAsyncListingError() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
subscriber.onError(new RuntimeException("Simulated listing error"));
}

@Override
public void cancel() {}
});
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
blobContainer.deleteAsync(new ActionListener<>() {
@Override
public void onResponse(DeleteResult deleteResult) {
fail("Expected a failure, but got a success response");
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertNotNull(exceptionRef.get());
assertEquals(IOException.class, exceptionRef.get().getClass());
assertEquals("Failed to list objects for deletion", exceptionRef.get().getMessage());
assertNotNull(exceptionRef.get().getCause());
assertEquals("Simulated listing error", exceptionRef.get().getCause().getMessage());
}

public void testDeleteAsyncDeletionError() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
subscriber.onNext(
ListObjectsV2Response.builder().contents(S3Object.builder().key("test-key").size(100L).build()).build()
);
subscriber.onComplete();
}

@Override
public void cancel() {}
});
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

// Simulate a failure in deleteObjects
CompletableFuture<DeleteObjectsResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Simulated delete error"));
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
blobContainer.deleteAsync(new ActionListener<>() {
@Override
public void onResponse(DeleteResult deleteResult) {
fail("Expected a failure, but got a success response");
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertNotNull(exceptionRef.get());
assertEquals(CompletionException.class, exceptionRef.get().getClass());
assertEquals("java.lang.RuntimeException: Simulated delete error", exceptionRef.get().getMessage());
assertNotNull(exceptionRef.get().getCause());
assertEquals("Simulated delete error", exceptionRef.get().getCause().getMessage());
}

public void testDeleteBlobsAsyncIgnoringIfNotExists() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
Expand Down Expand Up @@ -1451,6 +1635,151 @@ public void onFailure(Exception e) {
assertTrue(deletedKeys.containsAll(blobNames));
}

public void testDeleteBlobsAsyncIgnoringIfNotExistsFailure() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

// Simulate a failure in deleteObjects
CompletableFuture<DeleteObjectsResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Simulated delete failure"));
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

List<String> blobNames = Arrays.asList("blob1", "blob2", "blob3");

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
fail("Expected a failure, but got a success response");
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertNotNull(exceptionRef.get());
assertEquals(IOException.class, exceptionRef.get().getClass());
assertEquals("Failed to delete blobs " + blobNames, exceptionRef.get().getMessage());
assertNotNull(exceptionRef.get().getCause());
assertEquals("java.lang.RuntimeException: Simulated delete failure", exceptionRef.get().getCause().getMessage());
}

public void testDeleteBlobsAsyncIgnoringIfNotExistsWithEmptyList() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

List<String> emptyBlobNames = Collections.emptyList();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean onResponseCalled = new AtomicBoolean(false);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();

blobContainer.deleteBlobsAsyncIgnoringIfNotExists(emptyBlobNames, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
onResponseCalled.set(true);
latch.countDown();
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertTrue("onResponse should have been called", onResponseCalled.get());
assertNull("No exception should have been thrown", exceptionRef.get());

// Verify that no interactions with S3AsyncClient occurred
verifyNoInteractions(s3AsyncClient);
}

public void testDeleteBlobsAsyncIgnoringIfNotExistsInitializationFailure() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

// Simulate a failure when getting the asyncClientReference
RuntimeException simulatedFailure = new RuntimeException("Simulated initialization failure");
when(blobStore.asyncClientReference()).thenThrow(simulatedFailure);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

List<String> blobNames = Arrays.asList("blob1", "blob2", "blob3");

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();

blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
fail("Expected a failure, but got a success response");
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertNotNull("An exception should have been thrown", exceptionRef.get());
assertTrue("Exception should be an IOException", exceptionRef.get() instanceof IOException);
assertEquals("Failed to initiate async blob deletion", exceptionRef.get().getMessage());
assertEquals(simulatedFailure, exceptionRef.get().getCause());
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down

0 comments on commit eee4a69

Please sign in to comment.