Skip to content

Commit

Permalink
S3 scan with source coordination (#2689)
Browse files Browse the repository at this point in the history
Implement S3 Scan using SourceCoordinator

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed May 16, 2023
1 parent abce720 commit b64e7e7
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe

@Override
public void giveUpPartitions() {
validateIsInitialized();

if (!initialized) {
return;
}

final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition();
if (activePartition.isPresent()) {
Expand All @@ -204,7 +207,7 @@ public void giveUpPartitions() {

sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);

LOG.debug("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
}
partitionManager.removeActivePartition();
}
Expand Down Expand Up @@ -250,7 +253,7 @@ private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final St

private void validateIsInitialized() {
if (!initialized) {
throw new UninitializedSourceCoordinatorException("The initialize method has not been called on this source coordinator. initialize must be called before further interactions with the SourceCoordinator");
throw new UninitializedSourceCoordinatorException("The initialize method has not been called on this source coordinator. initialize() must be called before further interactions with the SourceCoordinator");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,15 @@ void saveProgressStateForPartition_with_owned_partition_and_existing_store_item_
}
}

@Test
void giveUpPartitions_with_nonInitialized_store_does_nothing_and_returns() {
final SourceCoordinator<String> objectUnderTest = new LeaseBasedSourceCoordinator<>(String.class, sourceCoordinationStore, sourceCoordinationConfig, partitionManager, ownerPrefix);

objectUnderTest.giveUpPartitions();

verifyNoInteractions(sourceCoordinationStore, partitionManager);
}

@Test
void giveUpPartitions_with_active_partitionKey_that_does_not_exist_in_the_store_removes_the_active_partition() {
final SourcePartition<String> sourcePartition = SourcePartition.builder(String.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.configuration.CompressionOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption;
Expand Down Expand Up @@ -69,6 +71,9 @@ public class S3ScanObjectWorkerIT {
private S3ObjectGenerator s3ObjectGenerator;
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));

@Mock
private SourceCoordinator<S3SourceProgressState> sourceCoordinator;

private S3ObjectHandler createObjectUnderTest(final S3ObjectRequest s3ObjectRequest){
if(Objects.nonNull(s3ObjectRequest.getExpression()))
return new S3SelectObjectWorker(s3ObjectRequest);
Expand Down Expand Up @@ -148,7 +153,7 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen
.compressionType(shouldCompress ? CompressionType.GZIP : CompressionType.NONE)
.s3SelectResponseHandlerFactory(new S3SelectResponseHandlerFactory()).build();
return new ScanObjectWorker(s3Client,List.of(scanOptions),createObjectUnderTest(s3ObjectRequest)
,bucketOwnerProvider);
,bucketOwnerProvider, sourceCoordinator);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
*/
package org.opensearch.dataprepper.plugins.source;

import java.io.IOException;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.io.IOException;

/**
* A S3ObjectHandler interface must be extended/implement for S3 Object parsing
*
Expand All @@ -19,5 +20,6 @@ public interface S3ObjectHandler {
*
* @throws IOException exception is thrown every time because this is not supported
*/
void parseS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException;
void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.utils.Pair;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class S3ScanPartitionCreationSupplier implements Supplier<List<PartitionIdentifier>> {

private static final String BUCKET_OBJECT_PARTITION_KEY_FORMAT = "%s|%s";

private final S3Client s3Client;
private final BucketOwnerProvider bucketOwnerProvider;
private final List<ScanOptions> scanOptionsList;
public S3ScanPartitionCreationSupplier(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final List<ScanOptions> scanOptionsList) {

this.s3Client = s3Client;
this.bucketOwnerProvider = bucketOwnerProvider;
this.scanOptionsList = scanOptionsList;
}

@Override
public List<PartitionIdentifier> get() {
final List<PartitionIdentifier> objectsToProcess = new ArrayList<>();

for (final ScanOptions scanOptions : scanOptionsList) {
final List<String> excludeItems = new ArrayList<>();
final S3ScanKeyPathOption s3ScanKeyPathOption = scanOptions.getS3ScanKeyPathOption();
final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder()
.bucket(scanOptions.getBucket());
bucketOwnerProvider.getBucketOwner(scanOptions.getBucket())
.ifPresent(listObjectsV2Request::expectedBucketOwner);

if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions()))
excludeItems.addAll(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions());

if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludeOptions()))
s3ScanKeyPathOption.getS3scanIncludeOptions().forEach(includePath -> {
listObjectsV2Request.prefix(includePath);
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, scanOptions.getBucket(),
scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime()));
});
else
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, scanOptions.getBucket(),
scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime()));
}

return objectsToProcess;
}

private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<String> excludeKeyPaths,
final ListObjectsV2Request.Builder listObjectsV2Request,
final String bucket,
final LocalDateTime startDateTime,
final LocalDateTime endDateTime) {

final List<PartitionIdentifier> allPartitionIdentifiers = new ArrayList<>();
ListObjectsV2Response listObjectsV2Response = null;
do {
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build());
allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream()
.map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified())))
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
.filter(keyTimestampPair -> excludeKeyPaths.stream()
.noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem)))
.filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime))
.map(Pair::left)
.map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build())
.collect(Collectors.toList()));
} while (listObjectsV2Response.isTruncated());

return allPartitionIdentifiers;
}

private LocalDateTime instantToLocalDateTime(final Instant instant) {
final ZonedDateTime zonedDateTime = instant.atZone(ZoneId.systemDefault());
return zonedDateTime.toLocalDateTime();
}

/**
* Used for identifying s3 object last modified time match with scan the date range.
* @return boolean
*/
private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime,
final LocalDateTime startDateTime,
final LocalDateTime endDateTime){
return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
Expand All @@ -29,23 +30,26 @@ public class S3ScanService {
private Thread scanObjectWorkerThread;

private final BucketOwnerProvider bucketOwnerProvider;
private final SourceCoordinator<S3SourceProgressState> sourceCoordinator;

public S3ScanService(final S3SourceConfig s3SourceConfig,
final S3ClientBuilderFactory s3ClientBuilderFactory,
final S3ObjectHandler s3ObjectHandler,
final BucketOwnerProvider bucketOwnerProvider ) {
final BucketOwnerProvider bucketOwnerProvider,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator) {
this.s3ScanBucketOptions = s3SourceConfig.getS3ScanScanOptions().getBuckets();
this.s3ClientBuilderFactory = s3ClientBuilderFactory;
this.endDateTime = s3SourceConfig.getS3ScanScanOptions().getEndTime();
this.startDateTime = s3SourceConfig.getS3ScanScanOptions().getStartTime();
this.range = s3SourceConfig.getS3ScanScanOptions().getRange();
this.s3ObjectHandler = s3ObjectHandler;
this.bucketOwnerProvider = bucketOwnerProvider;
this.sourceCoordinator = sourceCoordinator;
}

public void start() {
scanObjectWorkerThread = new Thread(new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
getScanOptions(),s3ObjectHandler,bucketOwnerProvider));
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator));
scanObjectWorkerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanScanOptions;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
import org.opensearch.dataprepper.plugins.source.ownership.ConfigBucketOwnerProviderFactory;
Expand All @@ -30,7 +32,7 @@
import java.util.function.BiConsumer;

@DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class)
public class S3Source implements Source<Record<Event>> {
public class S3Source implements Source<Record<Event>>, UsesSourceCoordination {

private final PluginMetrics pluginMetrics;
private final S3SourceConfig s3SourceConfig;
Expand All @@ -40,6 +42,7 @@ public class S3Source implements Source<Record<Event>> {
private final Optional<S3ScanScanOptions> s3ScanScanOptional;
private final AcknowledgementSetManager acknowledgementSetManager;
private final boolean acknowledgementsEnabled;
private SourceCoordinator<S3SourceProgressState> sourceCoordinator;


@DataPrepperPluginConstructor
Expand Down Expand Up @@ -108,13 +111,26 @@ public void start(Buffer<Record<Event>> buffer) {
sqsService.start();
}
if(s3ScanScanOptional.isPresent()) {
s3ScanService = new S3ScanService(s3SourceConfig,s3ClientBuilderFactory,s3Handler,bucketOwnerProvider);
s3ScanService = new S3ScanService(s3SourceConfig,s3ClientBuilderFactory,s3Handler,bucketOwnerProvider, sourceCoordinator);
s3ScanService.start();
}
}

@Override
public void stop() {
sqsService.stop();
if (Objects.nonNull(sourceCoordinator)) {
sourceCoordinator.giveUpPartitions();
}
}

@Override
public <T> void setSourceCoordinator(final SourceCoordinator<T> sourceCoordinator) {
this.sourceCoordinator = (SourceCoordinator<S3SourceProgressState>) sourceCoordinator;
}

@Override
public Class<?> getPartitionProgressStateClass() {
return S3SourceProgressState.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

public class S3SourceProgressState {
}
Loading

0 comments on commit b64e7e7

Please sign in to comment.