From b64e7e794eaa805ad4318ebb60bc56946a6e4ed4 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 16 May 2023 12:14:58 -0500 Subject: [PATCH] S3 scan with source coordination (#2689) Implement S3 Scan using SourceCoordinator Signed-off-by: Taylor Gray --- .../LeaseBasedSourceCoordinator.java | 9 +- .../LeaseBasedSourceCoordinatorTest.java | 9 + .../plugins/source/S3ScanObjectWorkerIT.java | 7 +- .../plugins/source/S3ObjectHandler.java | 6 +- .../S3ScanPartitionCreationSupplier.java | 109 +++++ .../plugins/source/S3ScanService.java | 8 +- .../dataprepper/plugins/source/S3Source.java | 20 +- .../plugins/source/S3SourceProgressState.java | 9 + .../plugins/source/ScanObjectWorker.java | 159 +++---- .../source/S3ScanObjectWorkerTest.java | 397 +++--------------- .../S3ScanPartitionCreationSupplierTest.java | 133 ++++++ .../plugins/source/S3ScanServiceTest.java | 3 +- 12 files changed, 419 insertions(+), 450 deletions(-) create mode 100644 data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java create mode 100644 data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceProgressState.java create mode 100644 data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java index 01cf694dda..5469d3f624 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -189,7 +189,10 @@ public void saveProgressStateForPartition(final String partitionKe @Override public void giveUpPartitions() { - validateIsInitialized(); + + if (!initialized) { + return; + } final Optional> activePartition = partitionManager.getActivePartition(); if (activePartition.isPresent()) { @@ -204,7 +207,7 @@ public void giveUpPartitions() { sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem); - LOG.debug("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId); + LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId); } partitionManager.removeActivePartition(); } @@ -250,7 +253,7 @@ private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final St private void validateIsInitialized() { if (!initialized) { - throw new UninitializedSourceCoordinatorException("The initialize method has not been called on this source coordinator. initialize must be called before further interactions with the SourceCoordinator"); + throw new UninitializedSourceCoordinatorException("The initialize method has not been called on this source coordinator. initialize() must be called before further interactions with the SourceCoordinator"); } } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java index 08fa07c05d..efc63fad48 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java @@ -411,6 +411,15 @@ void saveProgressStateForPartition_with_owned_partition_and_existing_store_item_ } } + @Test + void giveUpPartitions_with_nonInitialized_store_does_nothing_and_returns() { + final SourceCoordinator objectUnderTest = new LeaseBasedSourceCoordinator<>(String.class, sourceCoordinationStore, sourceCoordinationConfig, partitionManager, ownerPrefix); + + objectUnderTest.giveUpPartitions(); + + verifyNoInteractions(sourceCoordinationStore, partitionManager); + } + @Test void giveUpPartitions_with_active_partitionKey_that_does_not_exist_in_the_store_removes_the_active_partition() { final SourcePartition sourcePartition = SourcePartition.builder(String.class) diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java index 2fcc8ff3fa..e67e269ae9 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java @@ -17,9 +17,11 @@ import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.configuration.CompressionOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption; @@ -69,6 +71,9 @@ public class S3ScanObjectWorkerIT { private S3ObjectGenerator s3ObjectGenerator; private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + @Mock + private SourceCoordinator sourceCoordinator; + private S3ObjectHandler createObjectUnderTest(final S3ObjectRequest s3ObjectRequest){ if(Objects.nonNull(s3ObjectRequest.getExpression())) return new S3SelectObjectWorker(s3ObjectRequest); @@ -148,7 +153,7 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen .compressionType(shouldCompress ? CompressionType.GZIP : CompressionType.NONE) .s3SelectResponseHandlerFactory(new S3SelectResponseHandlerFactory()).build(); return new ScanObjectWorker(s3Client,List.of(scanOptions),createObjectUnderTest(s3ObjectRequest) - ,bucketOwnerProvider); + ,bucketOwnerProvider, sourceCoordinator); } @ParameterizedTest diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java index c1466cbe21..b8072d8dff 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java @@ -4,9 +4,10 @@ */ package org.opensearch.dataprepper.plugins.source; -import java.io.IOException; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import java.io.IOException; + /** * A S3ObjectHandler interface must be extended/implement for S3 Object parsing * @@ -19,5 +20,6 @@ public interface S3ObjectHandler { * * @throws IOException exception is thrown every time because this is not supported */ - void parseS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException; + void parseS3Object(final S3ObjectReference s3ObjectReference, + final AcknowledgementSet acknowledgementSet) throws IOException; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java new file mode 100644 index 0000000000..0ee2bcc999 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.utils.Pair; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class S3ScanPartitionCreationSupplier implements Supplier> { + + private static final String BUCKET_OBJECT_PARTITION_KEY_FORMAT = "%s|%s"; + + private final S3Client s3Client; + private final BucketOwnerProvider bucketOwnerProvider; + private final List scanOptionsList; + public S3ScanPartitionCreationSupplier(final S3Client s3Client, + final BucketOwnerProvider bucketOwnerProvider, + final List scanOptionsList) { + + this.s3Client = s3Client; + this.bucketOwnerProvider = bucketOwnerProvider; + this.scanOptionsList = scanOptionsList; + } + + @Override + public List get() { + final List objectsToProcess = new ArrayList<>(); + + for (final ScanOptions scanOptions : scanOptionsList) { + final List excludeItems = new ArrayList<>(); + final S3ScanKeyPathOption s3ScanKeyPathOption = scanOptions.getS3ScanKeyPathOption(); + final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder() + .bucket(scanOptions.getBucket()); + bucketOwnerProvider.getBucketOwner(scanOptions.getBucket()) + .ifPresent(listObjectsV2Request::expectedBucketOwner); + + if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions())) + excludeItems.addAll(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions()); + + if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludeOptions())) + s3ScanKeyPathOption.getS3scanIncludeOptions().forEach(includePath -> { + listObjectsV2Request.prefix(includePath); + objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, scanOptions.getBucket(), + scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime())); + }); + else + objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, scanOptions.getBucket(), + scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime())); + } + + return objectsToProcess; + } + + private List listFilteredS3ObjectsForBucket(final List excludeKeyPaths, + final ListObjectsV2Request.Builder listObjectsV2Request, + final String bucket, + final LocalDateTime startDateTime, + final LocalDateTime endDateTime) { + + final List allPartitionIdentifiers = new ArrayList<>(); + ListObjectsV2Response listObjectsV2Response = null; + do { + listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); + allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() + .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) + .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) + .filter(keyTimestampPair -> excludeKeyPaths.stream() + .noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem))) + .filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime)) + .map(Pair::left) + .map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) + .collect(Collectors.toList())); + } while (listObjectsV2Response.isTruncated()); + + return allPartitionIdentifiers; + } + + private LocalDateTime instantToLocalDateTime(final Instant instant) { + final ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault()); + return zonedDateTime.toLocalDateTime(); + } + + /** + * Used for identifying s3 object last modified time match with scan the date range. + * @return boolean + */ + private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime, + final LocalDateTime startDateTime, + final LocalDateTime endDateTime){ + return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java index f078b4d397..165ae84a5f 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.source; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; @@ -29,11 +30,13 @@ public class S3ScanService { private Thread scanObjectWorkerThread; private final BucketOwnerProvider bucketOwnerProvider; + private final SourceCoordinator sourceCoordinator; public S3ScanService(final S3SourceConfig s3SourceConfig, final S3ClientBuilderFactory s3ClientBuilderFactory, final S3ObjectHandler s3ObjectHandler, - final BucketOwnerProvider bucketOwnerProvider ) { + final BucketOwnerProvider bucketOwnerProvider, + final SourceCoordinator sourceCoordinator) { this.s3ScanBucketOptions = s3SourceConfig.getS3ScanScanOptions().getBuckets(); this.s3ClientBuilderFactory = s3ClientBuilderFactory; this.endDateTime = s3SourceConfig.getS3ScanScanOptions().getEndTime(); @@ -41,11 +44,12 @@ public S3ScanService(final S3SourceConfig s3SourceConfig, this.range = s3SourceConfig.getS3ScanScanOptions().getRange(); this.s3ObjectHandler = s3ObjectHandler; this.bucketOwnerProvider = bucketOwnerProvider; + this.sourceCoordinator = sourceCoordinator; } public void start() { scanObjectWorkerThread = new Thread(new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(), - getScanOptions(),s3ObjectHandler,bucketOwnerProvider)); + getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator)); scanObjectWorkerThread.start(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java index c2494752e5..4b9cbaff6d 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java @@ -17,6 +17,8 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanScanOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.opensearch.dataprepper.plugins.source.ownership.ConfigBucketOwnerProviderFactory; @@ -30,7 +32,7 @@ import java.util.function.BiConsumer; @DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class) -public class S3Source implements Source> { +public class S3Source implements Source>, UsesSourceCoordination { private final PluginMetrics pluginMetrics; private final S3SourceConfig s3SourceConfig; @@ -40,6 +42,7 @@ public class S3Source implements Source> { private final Optional s3ScanScanOptional; private final AcknowledgementSetManager acknowledgementSetManager; private final boolean acknowledgementsEnabled; + private SourceCoordinator sourceCoordinator; @DataPrepperPluginConstructor @@ -108,7 +111,7 @@ public void start(Buffer> buffer) { sqsService.start(); } if(s3ScanScanOptional.isPresent()) { - s3ScanService = new S3ScanService(s3SourceConfig,s3ClientBuilderFactory,s3Handler,bucketOwnerProvider); + s3ScanService = new S3ScanService(s3SourceConfig,s3ClientBuilderFactory,s3Handler,bucketOwnerProvider, sourceCoordinator); s3ScanService.start(); } } @@ -116,5 +119,18 @@ public void start(Buffer> buffer) { @Override public void stop() { sqsService.stop(); + if (Objects.nonNull(sourceCoordinator)) { + sourceCoordinator.giveUpPartitions(); + } + } + + @Override + public void setSourceCoordinator(final SourceCoordinator sourceCoordinator) { + this.sourceCoordinator = (SourceCoordinator) sourceCoordinator; + } + + @Override + public Class getPartitionProgressStateClass() { + return S3SourceProgressState.class; } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceProgressState.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceProgressState.java new file mode 100644 index 0000000000..a518815f3a --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceProgressState.java @@ -0,0 +1,9 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +public class S3SourceProgressState { +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index 9199f021b3..5f96cf0d01 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -4,29 +4,21 @@ */ package org.opensearch.dataprepper.plugins.source; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.S3Object; import java.io.IOException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; +import java.util.Optional; +import java.util.function.Supplier; /** * Class responsible for processing the s3 scan objects with the help of S3ObjectWorker @@ -36,9 +28,9 @@ public class ScanObjectWorker implements Runnable{ private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class); - private final S3Client s3Client; + private static final int STANDARD_BACKOFF_MILLIS = 30_000; - private static final Map stateSaveMap = new HashMap<>(); + private final S3Client s3Client; private final List scanOptionsBuilderList; @@ -46,118 +38,71 @@ public class ScanObjectWorker implements Runnable{ private final BucketOwnerProvider bucketOwnerProvider; + private final SourceCoordinator sourceCoordinator; + + private final Supplier> partitionCreationSupplier; + + // Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped + private final boolean shouldStopProcessing = false; + public ScanObjectWorker(final S3Client s3Client, final List scanOptionsBuilderList, final S3ObjectHandler s3ObjectHandler, - final BucketOwnerProvider bucketOwnerProvider){ + final BucketOwnerProvider bucketOwnerProvider, + final SourceCoordinator sourceCoordinator){ this.s3Client = s3Client; this.scanOptionsBuilderList = scanOptionsBuilderList; this.s3ObjectHandler= s3ObjectHandler; this.bucketOwnerProvider = bucketOwnerProvider; + this.sourceCoordinator = sourceCoordinator; + this.sourceCoordinator.initialize(); + + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList); } - /** - * It will decide the s3 object parse S3ObjectWorker or S3SelectWorker - * based on s3 select configuration provided. - */ @Override public void run() { - scanOptionsBuilderList.forEach(this::parseS3ScanObjects); + while (!shouldStopProcessing) { + startProcessingObject(STANDARD_BACKOFF_MILLIS); + } } /** - * Method will parse the s3 object and write to {@link Buffer} + * For testing */ - void parseS3ScanObjects(final ScanOptions scanOptions) { - final List scanObjects = new ArrayList<>(); - final List excludeItems = new ArrayList<>(); - final S3ScanKeyPathOption s3ScanKeyPathOption = scanOptions.getS3ScanKeyPathOption(); - final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder() - .bucket(scanOptions.getBucket()); - bucketOwnerProvider.getBucketOwner(scanOptions.getBucket()) - .ifPresent(listObjectsV2Request::expectedBucketOwner); - - if(Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions())) - excludeItems.addAll(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions()); - - if(Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludeOptions())) - s3ScanKeyPathOption.getS3scanIncludeOptions().forEach(includePath -> { - listObjectsV2Request.prefix(includePath); - scanObjects.addAll(listS3Objects(excludeItems, listObjectsV2Request)); - }); - else - scanObjects.addAll(listS3Objects(excludeItems, listObjectsV2Request)); - - if(scanObjects.isEmpty()) - LOG.info("s3 objects are not found in configured scan pipeline."); - - scanObjects.forEach(key -> - processS3ObjectKeys(S3ObjectReference.bucketAndKey(scanOptions.getBucket(), - key).build(),s3ObjectHandler, scanOptions)); + void runWithoutInfiniteLoop() { + startProcessingObject(10); } - private List listS3Objects(List excludeKeyPaths, ListObjectsV2Request.Builder listObjectsV2Request) { - return s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).build()).contents().stream().map(S3Object::key) - .filter(path -> !path.endsWith("/")) - .filter(includeKeyPath -> excludeKeyPaths.stream() - .noneMatch(excludeItem -> includeKeyPath.endsWith(excludeItem))).collect(Collectors.toList()); - } + private void startProcessingObject(final int waitTimeMillis) { + final Optional> objectToProcess = sourceCoordinator.getNextPartition(partitionCreationSupplier); - private void processS3ObjectKeys(final S3ObjectReference s3ObjectReference, - final S3ObjectHandler s3ObjectHandler, - final ScanOptions scanOptions){ - final S3ObjectDetails s3ObjDetails = getS3ObjectDetails(s3ObjectReference); - final boolean isKeyMatchedBetweenTimeRange = isKeyMatchedBetweenTimeRange(s3ObjDetails.getS3ObjectLastModifiedTimestamp(), - scanOptions.getUseStartDateTime(), - scanOptions.getUseEndDateTime()); - if(isKeyMatchedBetweenTimeRange && (isKeyNotProcessedByS3Scan(s3ObjDetails))){ - updateKeyProcessedByS3Scan(s3ObjDetails); - try{ - s3ObjectHandler.parseS3Object(s3ObjectReference,null); - }catch (IOException ex){ - deleteKeyProcessedByS3Scan(s3ObjDetails); - LOG.error("Error while process the parseS3Object. ",ex); + if (objectToProcess.isEmpty()) { + try { + Thread.sleep(waitTimeMillis); + } catch (InterruptedException e) { + e.printStackTrace(); } + return; } - } - /** - * Method will identify already processed key. - * @return boolean - */ - private boolean isKeyNotProcessedByS3Scan(final S3ObjectDetails s3ObjectDetails) { - return stateSaveMap.get(s3ObjectDetails.getBucket()+s3ObjectDetails.getKey()) == null; - } - /** - * store the processed bucket key in the map. - */ - private void updateKeyProcessedByS3Scan(final S3ObjectDetails s3ObjectDetails) { - stateSaveMap.put((s3ObjectDetails.getBucket() + s3ObjectDetails.getKey()),s3ObjectDetails); - } - private void deleteKeyProcessedByS3Scan(S3ObjectDetails s3ObjDetails) { - stateSaveMap.remove(s3ObjDetails.getBucket() + s3ObjDetails.getKey()); - } + final String bucket = objectToProcess.get().getPartitionKey().split("\\|")[0]; + final String objectKey = objectToProcess.get().getPartitionKey().split("\\|")[1]; - /** - * fetch the s3 object last modified time. - * @return S3ObjectDetails - */ - private S3ObjectDetails getS3ObjectDetails(final S3ObjectReference s3ObjectReference){ - GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(s3ObjectReference.getBucketName()).key(s3ObjectReference.getKey()).build(); - ResponseInputStream s3ObjectResp = s3Client.getObject(getObjectRequest); - final Instant instant = s3ObjectResp.response().lastModified(); - ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault()); - return new S3ObjectDetails(s3ObjectReference.getBucketName(),s3ObjectReference.getKey(),zonedDateTime.toLocalDateTime()); + try { + processS3Object(S3ObjectReference.bucketAndKey(bucket, objectKey).build()); + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + } catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) { + LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage()); + sourceCoordinator.giveUpPartitions(); + } } - /** - * Used for identifying s3 object last modified time match with scan the date range. - * @return boolean - */ - private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime, - final LocalDateTime startDateTime, - final LocalDateTime endDateTime){ - return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime); + private void processS3Object(final S3ObjectReference s3ObjectReference){ + try { + s3ObjectHandler.parseS3Object(s3ObjectReference,null); + } catch (IOException ex) { + LOG.error("Error while process the parseS3Object. ",ex); + } } - } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java index 1a7113b026..1e37321071 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java @@ -5,386 +5,119 @@ package org.opensearch.dataprepper.plugins.source; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.DistributionSummary; -import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; -import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; -import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption; -import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.S3Object; -import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest; import java.io.IOException; -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CompletableFuture; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.Stream; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verifyNoMoreInteractions; +@ExtendWith(MockitoExtension.class) class S3ScanObjectWorkerTest { - @Mock - private Buffer> buffer; - @Mock - private Duration bufferTimeout; - @Mock - private BucketOwnerProvider bucketOwnerProvider; - @Mock - private S3Client s3Client; - @Mock - private Counter s3ObjectsFailedCounter; - @Mock - private Counter s3ObjectsFailedNotFoundCounter; - @Mock - private Counter s3ObjectsFailedAccessDeniedCounter; - @Mock - private Counter s3ObjectsSucceededCounter; - @Mock - private Timer s3ObjectReadTimer; - @Mock - private DistributionSummary s3ObjectSizeSummary; - @Mock - private DistributionSummary s3ObjectSizeProcessedSummary; - @Mock - private DistributionSummary s3ObjectEventsSummary; - @Mock - private ResponseInputStream objectInputStream; - @Mock - private GetObjectResponse getObjectResponse; - private long objectSize; - private int recordsToAccumulate; - @Mock - private CompressionEngine compressionEngine; - - @Mock - private ListObjectsV2Response listObjectsV2Response; @Mock - private S3ObjectRequest s3ObjectRequest; + private BucketOwnerProvider bucketOwnerProvider; @Mock - private S3SelectResponseHandlerFactory responseHandlerFactory; + private S3Client s3Client; @Mock - private S3SelectResponseHandler selectResponseHandler; + private S3ObjectHandler s3ObjectHandler; @Mock - private S3AsyncClient s3AsyncClient; + private SourceCoordinator sourceCoordinator; - @Mock - private S3ObjectHandler s3ObjectHandler; + private List scanOptionsList; - ScanObjectWorker createScanWorker(final ScanOptions scanOptions, - final String scanObjectName, - final S3ObjectHandler s3ObjectHandlerForCheck) throws IOException { - Random random = new Random(); - recordsToAccumulate = random.nextInt(10) + 2; - objectSize = random.nextInt(100_000) + 10_000; - objectInputStream = mock(ResponseInputStream.class); - getObjectResponse = mock(GetObjectResponse.class); - s3Client = mock(S3Client.class); - lenient().when(objectInputStream.response()).thenReturn(getObjectResponse); - lenient().when(getObjectResponse.contentLength()).thenReturn(objectSize); - lenient().when(getObjectResponse.lastModified()).thenReturn(LocalDateTime.of(2023, 03, 06, 10, 10).atZone(ZoneId.systemDefault()).toInstant()); - listObjectsV2Response = mock(ListObjectsV2Response.class); - S3Object s3Object = mock(S3Object.class); - when(s3Object.key()).thenReturn(scanObjectName); - when(listObjectsV2Response.contents()).thenReturn(Arrays.asList(s3Object)); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); - when(s3Client.getObject(any(GetObjectRequest.class))) - .thenReturn(objectInputStream); - s3ObjectsFailedCounter = mock(Counter.class); - s3ObjectsSucceededCounter = mock(Counter.class); - s3ObjectReadTimer = mock(Timer.class); - compressionEngine = mock(CompressionEngine.class); - bucketOwnerProvider = mock(BucketOwnerProvider.class); - s3ObjectRequest = mock(S3ObjectRequest.class); - S3ObjectPluginMetrics s3PluginMetrics = mock(S3ObjectPluginMetrics.class); - when(s3PluginMetrics.getS3ObjectsFailedCounter()).thenReturn(s3ObjectsFailedCounter); - when(s3PluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(s3ObjectsFailedNotFoundCounter); - when(s3PluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(s3ObjectsFailedAccessDeniedCounter); - when(s3PluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter); - when(s3PluginMetrics.getS3ObjectReadTimer()).thenReturn(s3ObjectReadTimer); - when(s3PluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); - when(s3PluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(s3ObjectSizeProcessedSummary); - when(s3PluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary); - when(s3PluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter); - when(s3ObjectRequest.getS3ObjectPluginMetrics()).thenReturn(s3PluginMetrics); - when(bucketOwnerProvider.getBucketOwner("my-bucket-1")).thenReturn(Optional.of("my-bucket-1")); - when(s3ObjectRequest.getBucketOwnerProvider()).thenReturn(bucketOwnerProvider); - lenient().when(compressionEngine.createInputStream("file1.csv", objectInputStream)).thenReturn(objectInputStream); - if (s3ObjectHandlerForCheck instanceof S3ObjectWorker) - s3ObjectHandler = new S3ObjectWorker(s3ObjectRequest); - else if (s3ObjectHandlerForCheck instanceof S3SelectObjectWorker) { - selectResponseHandler = mock(S3SelectResponseHandler.class); - s3AsyncClient = mock(S3AsyncClient.class); - S3SelectCSVOption s3SelectCSVOption = mock(S3SelectCSVOption.class); - responseHandlerFactory = mock(S3SelectResponseHandlerFactory.class); - given(s3ObjectRequest.getS3AsyncClient()).willReturn(s3AsyncClient); - when(s3SelectCSVOption.getFileHeaderInfo()).thenReturn("csv"); - when(s3ObjectRequest.getS3SelectCSVOption()).thenReturn(s3SelectCSVOption); - when(s3ObjectRequest.getSerializationFormatOption()).thenReturn(S3SelectSerializationFormatOption.CSV); - given(s3ObjectRequest.getS3SelectResponseHandlerFactory()).willReturn(responseHandlerFactory); - given(responseHandlerFactory.provideS3SelectResponseHandler()).willReturn(selectResponseHandler); - final CompletableFuture selectObjectResponseFuture = mock(CompletableFuture.class); - given(selectObjectResponseFuture.join()).willReturn(mock(Void.class)); - given(s3AsyncClient.selectObjectContent(any(SelectObjectContentRequest.class), - eq(selectResponseHandler))).willReturn(selectObjectResponseFuture); - s3ObjectHandler = new S3SelectObjectWorker(s3ObjectRequest); - } + @BeforeEach + void setup() { + scanOptionsList = new ArrayList<>(); + } - return new ScanObjectWorker(s3Client, Arrays.asList(scanOptions), s3ObjectHandler,bucketOwnerProvider); + private ScanObjectWorker createObjectUnderTest() { + final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator); + verify(sourceCoordinator).initialize(); + return objectUnderTest; } - @Test - void s3_scan_bucket_with_s3Object_verify_start_time_and_range_combination() throws IOException { - final String scanObjectName = "sample.csv"; - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(List.of(scanObjectName)); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setStartDateTime(LocalDateTime.parse("2023-03-06T00:00:00")) - .setRange(Duration.parse("P2DT1H")) - .setBucket("my-bucket-1") - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, scanObjectName, mock(S3ObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + @ParameterizedTest + @MethodSource("exceptionProvider") + void giveUpPartitions_is_called_when_a_PartitionException_is_thrown_from_parseS3Object(final Class exception) throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; - verify(s3ObjectsSucceededCounter).increment(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo("my-bucket-1")); - assertThat(actualGetObjectRequest.key(), equalTo(scanObjectName)); - } + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class).withPartitionKey(partitionKey).build(); - @Test - void s3_scan_bucket_with_s3Object_verify_start_time_and_end_time_combination() throws IOException { - final String scanObjectName = "sample1.csv"; - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(List.of(scanObjectName)); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setStartDateTime(LocalDateTime.parse("2023-03-06T00:00:00")) - .setEndDateTime(LocalDateTime.parse("2023-04-09T00:00:00")) - .setBucket("my-bucket-1") - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, scanObjectName, mock(S3ObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + given(sourceCoordinator.getNextPartition(any(Supplier.class))).willReturn(Optional.of(partitionToProcess)); - verify(s3ObjectsSucceededCounter).increment(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo("my-bucket-1")); - assertThat(actualGetObjectRequest.key(), equalTo(scanObjectName)); - } + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doThrow(exception).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null)); + doNothing().when(sourceCoordinator).giveUpPartitions(); - @Test - void s3_scan_bucket_with_s3Object_verify_end_time_and_range_combination() throws IOException { - final String scanObjectName = "test.csv"; - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(List.of(scanObjectName)); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setEndDateTime(LocalDateTime.parse("2023-03-09T00:00:00")) - .setRange(Duration.parse("P10DT2H")) - .setBucket("my-bucket-1") - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, scanObjectName, mock(S3ObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + createObjectUnderTest().runWithoutInfiniteLoop(); - verify(s3ObjectsSucceededCounter).increment(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo("my-bucket-1")); - assertThat(actualGetObjectRequest.key(), equalTo(scanObjectName)); + verifyNoMoreInteractions(sourceCoordinator); } @Test - void s3_scan_bucket_with_s3Object_skip_processed_key() throws IOException { - final String scanObjectName = "test.csv"; - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(List.of(scanObjectName)); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setEndDateTime(LocalDateTime.parse("2023-03-09T00:00:00")) - .setRange(Duration.parse("P10DT2H")) - .setBucket("my-bucket-1") - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, scanObjectName, mock(S3ObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + void partition_from_getNextPartition_is_processed_correctly() throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; - verify(s3ObjectsSucceededCounter, times(0)).increment(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo("my-bucket-1")); - assertThat(actualGetObjectRequest.key(), equalTo(scanObjectName)); - } - @Test - void s3_scan_bucket_with_s3_select_verify_end_time_and_range_combination() throws IOException { - final String startDateTime = "2023-03-07T10:00:00"; - final String bucketName = "my-bucket-1"; - final List keyPathList = Arrays.asList("file3.csv"); - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(keyPathList); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setEndDateTime(LocalDateTime.parse(startDateTime)) - .setRange(Duration.parse("P10DT2H")) - .setBucket(bucketName) - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, keyPathList.get(0), mock(S3SelectObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class).withPartitionKey(partitionKey).build(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - verify(s3ObjectsSucceededCounter).increment(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo(bucketName)); - assertThat(actualGetObjectRequest.key(), equalTo(keyPathList.get(0))); + given(sourceCoordinator.getNextPartition(any(Supplier.class))).willReturn(Optional.of(partitionToProcess)); - } + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null)); + doNothing().when(sourceCoordinator).completePartition(partitionKey); - @Test - void s3_scan_bucket_with_s3_select_verify_start_time_and_range_combination() throws IOException { - final String bucketName = "my-bucket-1"; - final List keyPathList = Arrays.asList("file2.csv"); - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(keyPathList); - final S3SelectSerializationFormatOption s3SelectSerializationFormatOption = - S3SelectSerializationFormatOption.valueOf("CSV"); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setStartDateTime(LocalDateTime.parse("2023-03-05T10:00:00")) - .setRange(Duration.parse("P10DT2H")) - .setBucket(bucketName) - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, keyPathList.get(0), mock(S3SelectObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + createObjectUnderTest().runWithoutInfiniteLoop(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - verify(s3ObjectsSucceededCounter).increment(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo(bucketName)); - assertThat(actualGetObjectRequest.key(), equalTo(keyPathList.get(0))); + final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); + assertThat(processedObject.getBucketName(), equalTo(bucket)); + assertThat(processedObject.getKey(), equalTo(objectKey)); } @Test - void s3_scan_bucket_with_s3_select_verify_start_time_and_end_time_combination() throws IOException { - final String bucketName = "my-bucket-1"; - final List keyPathList = Arrays.asList("file1.csv"); - S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); - when(s3ScanKeyPathOption.getS3scanIncludeOptions()).thenReturn(keyPathList); - final ScanOptions scanOptions = new ScanOptions.Builder() - .setStartDateTime(LocalDateTime.parse("2023-03-05T10:00:00")) - .setEndDateTime(LocalDateTime.parse("2023-04-09T00:00:00")) - .setBucket(bucketName) - .setS3ScanKeyPathOption(s3ScanKeyPathOption).build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, keyPathList.get(0), mock(S3SelectObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); - - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - verify(s3ObjectsSucceededCounter).increment(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo(bucketName)); - assertThat(actualGetObjectRequest.key(), equalTo(keyPathList.get(0))); + void getNextPartition_supplier_is_expected_partitionCreationSupplier() { + given(sourceCoordinator.getNextPartition(any(S3ScanPartitionCreationSupplier.class))).willReturn(Optional.empty()); + final ScanObjectWorker objectUnderTest = createObjectUnderTest(); + objectUnderTest.runWithoutInfiniteLoop(); } - @Test - void s3_scan_service_whole_bucket_scan_test() throws IOException { - final String scanObjectName = "bucket-test.csv"; - final ScanOptions scanOptions = new ScanOptions.Builder() - .setEndDateTime(LocalDateTime.parse("2023-03-09T00:00:00")) - .setRange(Duration.parse("P10DT2H")) - .setBucket("my-bucket-1").build(); - final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); - try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) - .thenReturn(bufferAccumulator); - final ScanObjectWorker scanWorker = createScanWorker(scanOptions, scanObjectName, mock(S3ObjectWorker.class)); - scanWorker.run(); - } - final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); - verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); - - verify(s3ObjectsSucceededCounter).increment(); - final GetObjectRequest actualGetObjectRequest = getObjectRequestArgumentCaptor.getValue(); - assertThat(actualGetObjectRequest, notNullValue()); - assertThat(actualGetObjectRequest.bucket(), equalTo("my-bucket-1")); - assertThat(actualGetObjectRequest.key(), equalTo(scanObjectName)); - } + static Stream exceptionProvider() { + return Stream.of(PartitionUpdateException.class, PartitionNotFoundException.class, PartitionNotOwnedException.class); + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java new file mode 100644 index 0000000000..940667e315 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +@ExtendWith(MockitoExtension.class) +public class S3ScanPartitionCreationSupplierTest { + + @Mock + private S3Client s3Client; + + @Mock + private BucketOwnerProvider bucketOwnerProvider; + + private List scanOptionsList; + + @BeforeEach + void setup() { + scanOptionsList = new ArrayList<>(); + } + + + private Supplier> createObjectUnderTest() { + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList); + } + + @Test + void getNextPartition_supplier_returns_expected_PartitionIdentifiers() { + + final String firstBucket = UUID.randomUUID().toString(); + final String secondBucket = UUID.randomUUID().toString(); + + final Instant startTime = Instant.now(); + final Instant endTime = Instant.now().plus(3, ChronoUnit.MINUTES); + + + final ScanOptions firstBucketScanOptions = mock(ScanOptions.class); + given(firstBucketScanOptions.getBucket()).willReturn(firstBucket); + given(firstBucketScanOptions.getUseStartDateTime()).willReturn(LocalDateTime.ofInstant(startTime, ZoneId.systemDefault())); + given(firstBucketScanOptions.getUseEndDateTime()).willReturn(LocalDateTime.ofInstant(endTime, ZoneId.systemDefault())); + final S3ScanKeyPathOption firstBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(firstBucketScanOptions.getS3ScanKeyPathOption()).willReturn(firstBucketScanKeyPath); + given(firstBucketScanKeyPath.getS3scanIncludeOptions()).willReturn(List.of(UUID.randomUUID().toString())); + given(firstBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(List.of(".invalid")); + scanOptionsList.add(firstBucketScanOptions); + + final ScanOptions secondBucketScanOptions = mock(ScanOptions.class); + given(secondBucketScanOptions.getBucket()).willReturn(secondBucket); + given(secondBucketScanOptions.getUseStartDateTime()).willReturn(LocalDateTime.ofInstant(startTime, ZoneId.systemDefault())); + given(secondBucketScanOptions.getUseEndDateTime()).willReturn(LocalDateTime.ofInstant(endTime, ZoneId.systemDefault())); + final S3ScanKeyPathOption secondBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(secondBucketScanOptions.getS3ScanKeyPathOption()).willReturn(secondBucketScanKeyPath); + given(secondBucketScanKeyPath.getS3scanIncludeOptions()).willReturn(null); + given(secondBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(null); + scanOptionsList.add(secondBucketScanOptions); + + final Supplier> partitionCreationSupplier = createObjectUnderTest(); + + final List expectedPartitionIdentifiers = new ArrayList<>(); + + final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); + final List s3ObjectsList = new ArrayList<>(); + + final S3Object invalidFolderObject = mock(S3Object.class); + given(invalidFolderObject.key()).willReturn("folder-key/"); + given(invalidFolderObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidFolderObject); + + final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class); + given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); + given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidForFirstBucketSuffixObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + + final S3Object invalidDueToLastModifiedOutsideOfStartEndObject = mock(S3Object.class); + given(invalidDueToLastModifiedOutsideOfStartEndObject.key()).willReturn(UUID.randomUUID().toString()); + given(invalidDueToLastModifiedOutsideOfStartEndObject.lastModified()).willReturn(Instant.now().minus(3, ChronoUnit.MINUTES)); + s3ObjectsList.add(invalidDueToLastModifiedOutsideOfStartEndObject); + + final S3Object validObject = mock(S3Object.class); + given(validObject.key()).willReturn("valid"); + given(validObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(validObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + + given(listObjectsResponse.contents()).willReturn(s3ObjectsList); + + final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); + given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + + final List resultingPartitions = partitionCreationSupplier.get(); + + assertThat(resultingPartitions, notNullValue()); + assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + } +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java index 5400feb3ff..b8f2d6bb82 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; @@ -53,7 +54,7 @@ public void scan_service_with_s3_select_Configuration_test_and_verify() { when(s3ScanScanOptions.getBuckets()).thenReturn( List.of(bucket)); when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); S3ScanService service = new S3ScanService(s3SourceConfig,mock(S3ClientBuilderFactory.class), - mock(S3ObjectHandler.class),mock(BucketOwnerProvider.class)); + mock(S3ObjectHandler.class),mock(BucketOwnerProvider.class), mock(SourceCoordinator.class)); final List scanOptionsBuilder = service.getScanOptions(); assertThat(scanOptionsBuilder.get(0).getS3ScanKeyPathOption().getS3scanIncludeOptions(),sameInstance(includeKeyPathList)); assertThat(scanOptionsBuilder.get(0).getBucket(),sameInstance(bucketName));