forked from opensearch-project/data-prepper
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding the S3 sink plugin. Contributes to opensearch-project#1048
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
- Loading branch information
1 parent
232f0de
commit be5a4f5
Showing
34 changed files
with
2,464 additions
and
1 deletion.
There are no files selected for viewing
135 changes: 135 additions & 0 deletions
135
...ins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndex.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.s3keyindex; | ||
|
||
import java.time.Instant; | ||
import java.time.LocalDateTime; | ||
import java.time.ZoneId; | ||
import java.time.ZonedDateTime; | ||
import java.time.format.DateTimeFormatter; | ||
import java.util.Set; | ||
import java.util.TimeZone; | ||
import java.util.UUID; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Class responsible for creation of s3 key pattern based on date time stamp | ||
*/ | ||
public class S3ObjectIndex { | ||
|
||
private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\%{"; | ||
|
||
// For matching a string that begins with a "%{" and ends with a "}". | ||
// For a string like "data-prepper-%{yyyy-MM-dd}", "%{yyyy-MM-dd}" is matched. | ||
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}"; | ||
|
||
// For matching a string enclosed by "%{" and "}". | ||
// For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched. | ||
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}"; | ||
|
||
private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); | ||
|
||
S3ObjectIndex() { | ||
} | ||
|
||
/** | ||
* Create Object Name with date,time and UniqueID prepended. | ||
*/ | ||
public static String getObjectNameWithDateTimeId(final String indexAlias) { | ||
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); | ||
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; | ||
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + "-" + getTimeNanos() + "-" | ||
+ UUID.randomUUID(); | ||
} | ||
|
||
/** | ||
* Create Object path prefix. | ||
*/ | ||
public static String getObjectPathPrefix(final String indexAlias) { | ||
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); | ||
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; | ||
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix; | ||
} | ||
|
||
/** | ||
* Creates epoch seconds. | ||
*/ | ||
public static long getTimeNanos() { | ||
Instant time = Instant.now(); | ||
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000; | ||
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano(); | ||
return currentTimeNanos; | ||
} | ||
|
||
/** | ||
* Validate the index with the regular expression pattern. Throws exception if validation fails | ||
*/ | ||
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { | ||
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); | ||
final Matcher timePatternMatcher = pattern.matcher(indexAlias); | ||
if (timePatternMatcher.find()) { | ||
final String timePattern = timePatternMatcher.group(1); | ||
if (timePatternMatcher.find()) { // check if there is a one more match. | ||
throw new IllegalArgumentException("An index only allows one date-time pattern."); | ||
} | ||
if (timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)) { // check if it is a nested pattern such as | ||
// "data-prepper-%{%{yyyy.MM.dd}}" | ||
throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); | ||
} | ||
validateTimePatternIsAtTheEnd(indexAlias, timePattern); | ||
validateNoSpecialCharsInTimePattern(timePattern); | ||
validateTimePatternGranularity(timePattern); | ||
return DateTimeFormatter.ofPattern(timePattern); | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
* Data Prepper only allows time pattern as a suffix. | ||
*/ | ||
private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { | ||
if (!indexAlias.endsWith(timePattern + "}")) { | ||
throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); | ||
} | ||
} | ||
|
||
/** | ||
* Special characters can cause failures in creating indexes. | ||
*/ | ||
private static final Set<Character> INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); | ||
|
||
public static void validateNoSpecialCharsInTimePattern(String timePattern) { | ||
boolean containsInvalidCharacter = timePattern.chars().mapToObj(c -> (char) c) | ||
.anyMatch(character -> INVALID_CHARS.contains(character)); | ||
if (containsInvalidCharacter) { | ||
throw new IllegalArgumentException( | ||
"Index time pattern contains one or multiple special characters: " + INVALID_CHARS); | ||
} | ||
} | ||
|
||
/** | ||
* Validates the time pattern, support creating indexes with time patterns that are too granular | ||
* hour, minute and second | ||
*/ | ||
private static final Set<Character> UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); | ||
|
||
public static void validateTimePatternGranularity(String timePattern) { | ||
boolean containsUnsupportedTimeSymbol = timePattern.chars().mapToObj(c -> (char) c) | ||
.anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); | ||
if (containsUnsupportedTimeSymbol) { | ||
throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " | ||
+ UNSUPPORTED_TIME_GRANULARITY_CHARS); | ||
} | ||
} | ||
|
||
/** | ||
* Returns the current UTC Date and Time | ||
*/ | ||
public static ZonedDateTime getCurrentUtcTime() { | ||
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); | ||
} | ||
} |
116 changes: 116 additions & 0 deletions
116
...common/src/test/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.s3keyindex; | ||
|
||
import static org.junit.Assert.assertNotEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.junit.jupiter.api.Assertions.assertNull; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
import java.time.ZonedDateTime; | ||
import java.time.format.DateTimeFormatter; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class S3ObjectIndexTest { | ||
|
||
@Test | ||
void testObjectDateTimePatterns_not_equal() throws IllegalArgumentException { | ||
|
||
String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); | ||
String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); | ||
assertFalse(actualIndex.contains(expectedIndex)); | ||
} | ||
|
||
@Test | ||
void testgetObjectPathPrefix_not_equal() throws IllegalArgumentException { | ||
|
||
String expectedIndex = S3ObjectIndex.getObjectPathPrefix("events-%{yyyy}"); | ||
String actualIndex = S3ObjectIndex.getObjectPathPrefix("events-%{yyyy}"); | ||
assertTrue(actualIndex.contains(expectedIndex)); | ||
} | ||
|
||
@Test | ||
void testObjectTimePattern_Exceptional_time_TooGranular() throws IllegalArgumentException { | ||
assertThrows(IllegalArgumentException.class, () -> { | ||
S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-AA-dd}"); | ||
}); | ||
} | ||
|
||
@Test | ||
void testObjectTimePatterns_equal() throws IllegalArgumentException { | ||
|
||
DateTimeFormatter expectedIndex = S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}"); | ||
DateTimeFormatter actualIndex = S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}"); | ||
assertEquals(actualIndex.toString(), expectedIndex.toString()); | ||
} | ||
|
||
@Test | ||
void test_utc_current_time() throws IllegalArgumentException { | ||
|
||
ZonedDateTime expectedIndex = S3ObjectIndex.getCurrentUtcTime(); | ||
ZonedDateTime actualIndex = S3ObjectIndex.getCurrentUtcTime(); | ||
|
||
assertEquals(expectedIndex.getDayOfYear(), actualIndex.getDayOfYear()); | ||
assertEquals(expectedIndex.getDayOfMonth(), actualIndex.getDayOfMonth()); | ||
assertEquals(expectedIndex.getDayOfWeek(), actualIndex.getDayOfWeek()); | ||
} | ||
|
||
@Test | ||
void testObjectTimePattern_Exceptional_TooGranular() { | ||
assertThrows(IllegalArgumentException.class, () -> { | ||
S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-AA-ddThh:mm}"); | ||
}); | ||
} | ||
|
||
@Test | ||
void testObjectTimePattern_Exceptional_at_theEnd() { | ||
assertThrows(IllegalArgumentException.class, () -> { | ||
S3ObjectIndex.getDatePatternFormatter("events-%{yyy{MM}dd}"); | ||
}); | ||
} | ||
|
||
@Test | ||
void testObject_allows_one_date_time_pattern_Exceptional() { | ||
assertThrows(IllegalArgumentException.class, () -> { | ||
S3ObjectIndex.getDatePatternFormatter("events-%{yyyy-MM-dd}-%{yyyy-MM-dd}"); | ||
}); | ||
} | ||
|
||
@Test | ||
void testObject_nested_pattern_Exceptional() { | ||
assertThrows(IllegalArgumentException.class, () -> { | ||
S3ObjectIndex.getDatePatternFormatter("bucket-name-\\%{\\%{yyyy.MM.dd}}"); | ||
}); | ||
} | ||
|
||
@Test | ||
void testObject_null_time_pattern() throws NullPointerException { | ||
assertNull(S3ObjectIndex.getDatePatternFormatter("bucket-name")); | ||
} | ||
|
||
@Test | ||
void testObjectAliasWithDatePrefix_Exceptional_time_TooGranular() throws IllegalArgumentException { | ||
assertThrows(IllegalArgumentException.class, () -> { | ||
S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-AA-dd}"); | ||
}); | ||
} | ||
|
||
@Test | ||
void testObjectAliasWithDatePrefix_equal() throws IllegalArgumentException { | ||
|
||
String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); | ||
String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); | ||
assertNotEquals(actualIndex.toString(), expectedIndex.toString()); | ||
} | ||
|
||
@Test | ||
void test_default_constructor() { | ||
S3ObjectIndex object = new S3ObjectIndex(); | ||
assertNotNull(object); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
# S3 Sink | ||
|
||
This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client. | ||
|
||
The S3 sink plugin supports OpenSearch 2.0.0 and greater. | ||
|
||
## Usages | ||
|
||
The s3 sink should be configured as part of Data Prepper pipeline yaml file. | ||
|
||
## Configuration Options | ||
|
||
``` | ||
pipeline: | ||
... | ||
sink: | ||
- s3: | ||
aws: | ||
region: us-east-1 | ||
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper | ||
sts_header_overrides: | ||
max_retries: 5 | ||
bucket: | ||
name: bucket_name | ||
object_key: | ||
path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/ | ||
threshold: | ||
event_count: 2000 | ||
maximum_size: 50mb | ||
event_collect: 15s | ||
codec: | ||
ndjson: | ||
``` | ||
|
||
## Configuration | ||
|
||
- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). Defaults to `none`. | ||
|
||
- `aws_sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). Defaults to `none`. | ||
|
||
- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin. Defaults to `none`. | ||
|
||
- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3. Defaults to `5`. | ||
|
||
- `bucket` (Required) : Object storage built to store and retrieve any amount of data from anywhere, User must provide bucket name. | ||
|
||
- `object_key` (Optional) : It contains `path_prefix` and `file_pattern`. Defaults to s3 object `events-%{yyyy-MM-dd'T'hh-mm-ss}` inside bucket root directory. | ||
|
||
- `path_prefix` (Optional) : path_prefix nothing but directory structure inside bucket in-order to store objects. Defaults to `none`. | ||
|
||
- `event_count` (Required) : An integer value indicates the maximum number of events required to ingest into s3-bucket as part of threshold. | ||
|
||
- `maximum_size` (Optional) : A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold. Defaults to `50mb`. | ||
|
||
- `event_collect` (Required) : A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms"). | ||
|
||
- `buffer_type` (Optional) : Records stored temporary before flushing into s3 bucket. Possible values are `local_file` and `in_memory`. Defaults to `in_memory`. | ||
|
||
|
||
## Developer Guide | ||
|
||
This plugin is compatible with Java 8. See | ||
|
||
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) | ||
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
repositories { | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(path: ':data-prepper-plugins:common') | ||
implementation 'io.micrometer:micrometer-core' | ||
implementation 'com.fasterxml.jackson.core:jackson-core' | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
implementation 'org.apache.commons:commons-compress:1.21' | ||
implementation 'joda-time:joda-time:2.11.1' | ||
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' | ||
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' | ||
implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' | ||
implementation 'org.mapdb:mapdb:3.0.8' | ||
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.7.10' | ||
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.7.10' | ||
implementation 'org.apache.commons:commons-lang3:3.12.0' | ||
testImplementation project(':data-prepper-test-common') | ||
} | ||
|
||
test { | ||
useJUnitPlatform() | ||
} | ||
|
||
sourceSets { | ||
integrationTest { | ||
java { | ||
compileClasspath += main.output + test.output | ||
runtimeClasspath += main.output + test.output | ||
srcDir file('src/integrationTest/java') | ||
} | ||
resources.srcDir file('src/integrationTest/resources') | ||
} | ||
} | ||
|
||
configurations { | ||
integrationTestImplementation.extendsFrom testImplementation | ||
integrationTestRuntime.extendsFrom testRuntime | ||
} | ||
|
||
task integrationTest(type: Test) { | ||
group = 'verification' | ||
testClassesDirs = sourceSets.integrationTest.output.classesDirs | ||
|
||
useJUnitPlatform() | ||
|
||
classpath = sourceSets.integrationTest.runtimeClasspath | ||
systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') | ||
systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') | ||
systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') | ||
|
||
filter { | ||
includeTestsMatching '*IT' | ||
} | ||
} |
Oops, something went wrong.