Skip to content

Commit

Permalink
Add UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 10, 2024
1 parent ffc81c0 commit 09b7c68
Show file tree
Hide file tree
Showing 4 changed files with 422 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) {
public void close() {}
};

public MetricPublisher getDeleteObjectsMetricPublisher() {
return deleteObjectsMetricPublisher;
}

public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteS
return batches;
}

private static CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<List<String>> batches
) {
static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
Expand All @@ -64,16 +60,12 @@ private static CompletableFuture<Void> executeDeleteBatches(
return allDeletesFuture;
}

private static CompletableFuture<Void> executeSingleDeleteBatch(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> batch
) {
static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
}

private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
Expand All @@ -88,7 +80,7 @@ private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsRes
return null;
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Expand All @@ -97,7 +89,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -101,16 +102,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -1275,6 +1279,178 @@ public void testTransformResponseToInputStreamContainer() throws Exception {
assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available());
}

public void testDeleteAsync() throws Exception {
for (int i = 0; i < 100; i++) {
testDeleteAsync(i + 1);
}
}

private void testDeleteAsync(int bulkDeleteSize) throws InterruptedException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

final List<S3Object> s3Objects = new ArrayList<>();
int numObjects = randomIntBetween(20, 100);
long totalSize = 0;
for (int i = 0; i < numObjects; i++) {
long size = randomIntBetween(1, 100);
s3Objects.add(S3Object.builder().key(randomAlphaOfLength(10)).size(size).build());
totalSize += size;
}

final List<ListObjectsV2Response> responseList = new ArrayList<>();
int size = 0;
while (size < numObjects) {
int toAdd = randomIntBetween(10, 20);
int endIndex = Math.min(numObjects, size + toAdd);
responseList.add(ListObjectsV2Response.builder().contents(s3Objects.subList(size, endIndex)).build());
size = endIndex;
}
int expectedDeletedObjectsCall = numObjects / bulkDeleteSize + (numObjects % bulkDeleteSize > 0 ? 1 : 0);

final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
AtomicInteger counter = new AtomicInteger();
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
int currentCounter = counter.getAndIncrement();
if (currentCounter < responseList.size()) {
subscriber.onNext(responseList.get(currentCounter));
}
if (currentCounter == responseList.size()) {
subscriber.onComplete();
}
}

@Override
public void cancel() {}
});
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(
CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build())
);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DeleteResult> deleteResultRef = new AtomicReference<>();
blobContainer.deleteAsync(new ActionListener<>() {
@Override
public void onResponse(DeleteResult deleteResult) {
deleteResultRef.set(deleteResult);
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.error("exception during deleteAsync", e);
fail("Unexpected failure: " + e.getMessage());
}
});

latch.await();

DeleteResult deleteResult = deleteResultRef.get();
assertEquals(numObjects, deleteResult.blobsDeleted());
assertEquals(totalSize, deleteResult.bytesDeleted());

verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class));
verify(s3AsyncClient, times(expectedDeletedObjectsCall)).deleteObjects(any(DeleteObjectsRequest.class));
}

public void testDeleteBlobsAsyncIgnoringIfNotExists() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
int bulkDeleteSize = randomIntBetween(1, 10);
when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

final List<String> blobNames = new ArrayList<>();
int size = randomIntBetween(10, 100);
for (int i = 0; i < size; i++) {
blobNames.add(randomAlphaOfLength(10));
}
int expectedDeleteCalls = size / bulkDeleteSize + (size % bulkDeleteSize > 0 ? 1 : 0);

when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(
CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build())
);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
exceptionRef.set(e);
latch.countDown();
}
});

latch.await();

assertNull(exceptionRef.get());

ArgumentCaptor<DeleteObjectsRequest> deleteRequestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class);
verify(s3AsyncClient, times(expectedDeleteCalls)).deleteObjects(deleteRequestCaptor.capture());

DeleteObjectsRequest capturedRequest = deleteRequestCaptor.getAllValues().stream().findAny().get();
assertEquals(bucketName, capturedRequest.bucket());
int totalBlobsDeleted = deleteRequestCaptor.getAllValues()
.stream()
.map(r -> r.delete().objects().size())
.reduce(Integer::sum)
.get();
assertEquals(blobNames.size(), totalBlobsDeleted);
List<String> deletedKeys = deleteRequestCaptor.getAllValues()
.stream()
.flatMap(r -> r.delete().objects().stream())
.map(ObjectIdentifier::key)
.collect(Collectors.toList());
assertTrue(deletedKeys.containsAll(blobNames));
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down
Loading

0 comments on commit 09b7c68

Please sign in to comment.