From db2454ae4082e1e31c90351c6e593887c9ec2d26 Mon Sep 17 00:00:00 2001 From: David Adams Date: Mon, 16 Sep 2024 17:31:44 -0700 Subject: [PATCH] Add support for compressed kinesis data (#17062) --- docs/ingestion/kinesis-ingestion.md | 7 + .../indexing/kinesis/KinesisIndexTask.java | 3 +- .../kinesis/KinesisIndexTaskIOConfig.java | 19 +- .../kinesis/KinesisRecordSupplier.java | 12 +- .../indexing/kinesis/KinesisSamplerSpec.java | 3 +- .../kinesis/supervisor/KinesisSupervisor.java | 6 +- .../supervisor/KinesisSupervisorIOConfig.java | 16 +- .../indexing/kinesis/KinesisIOConfigTest.java | 3 +- .../kinesis/KinesisIndexTaskSerdeTest.java | 1 + .../kinesis/KinesisIndexTaskTest.java | 5 + .../kinesis/KinesisRecordSupplierTest.java | 245 +++++++++++++++++- .../kinesis/KinesisSamplerSpecTest.java | 9 +- .../supervisor/KinesisSupervisorTest.java | 25 +- processing/pom.xml | 5 + .../apache/druid/utils/CompressionUtils.java | 82 ++++++ .../util/common/CompressionUtilsTest.java | 98 +++++++ 16 files changed, 503 insertions(+), 36 deletions(-) diff --git a/docs/ingestion/kinesis-ingestion.md b/docs/ingestion/kinesis-ingestion.md index 2a855b7b7ff40..df17245384166 100644 --- a/docs/ingestion/kinesis-ingestion.md +++ b/docs/ingestion/kinesis-ingestion.md @@ -55,6 +55,7 @@ The following example shows a supervisor spec for a stream with the name `Kinesi "inputFormat": { "type": "json" }, + "compressionFormat": "zstd", "useEarliestSequenceNumber": true }, "tuningConfig": { @@ -148,6 +149,12 @@ The Kinesis indexing service supports the following values for `inputFormat`: You can use `parser` to read [`thrift`](../development/extensions-contrib/thrift.md) formats. +### Compression + +Unlike Kafka, Kinesis does not offer built in compression. Due to the operational costs of operating Kinesis streams at scale, you may find it advantageous to compress incoming data. + +Druid supports `bz2`, `gz`, `snappy`, `xz`, `zip`, and `zstd` compressed kinesis data. If your incoming data is compressed, include the compression type in the `compressionFormat` field. + ### Tuning configuration The following table outlines the `tuningConfig` configuration properties specific to Kinesis. diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index bea69d96c3d23..1f3ee6b983dac 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -145,7 +145,8 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) tuningConfig.getRecordBufferFullWait(), maxBytesPerPoll, false, - useListShards + useListShards, + ioConfig.getCompressionFormat() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 881d68ba89681..bf2fbc6bd51ba 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.utils.CompressionUtils; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -53,6 +54,7 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig data; if (deaggregateHandle == null || getDataHandle == null) { @@ -262,6 +265,9 @@ private Runnable fetchRecords() int recordSize = 0; for (UserRecord userRecord : userRecords) { + if (compressionFormat != null) { + userRecord.setData(CompressionUtils.decompress(userRecord.getData(), compressionFormat)); + } KinesisRecordEntity kinesisRecordEntity = new KinesisRecordEntity(userRecord); recordSize += kinesisRecordEntity.getBuffer().array().length; data.add(kinesisRecordEntity); @@ -403,6 +409,8 @@ private long getPartitionTimeLag() private final int recordBufferSizeBytes; private final boolean useEarliestSequenceNumber; private final boolean useListShards; + @Nullable + private final CompressionUtils.Format compressionFormat; private ScheduledExecutorService scheduledExec; @@ -423,7 +431,8 @@ public KinesisRecordSupplier( int recordBufferFullWait, int maxBytesPerPoll, boolean useEarliestSequenceNumber, - boolean useListShards + boolean useListShards, + CompressionUtils.Format compressionFormat ) { Preconditions.checkNotNull(amazonKinesis); @@ -437,6 +446,7 @@ public KinesisRecordSupplier( this.useEarliestSequenceNumber = useEarliestSequenceNumber; this.useListShards = useListShards; this.backgroundFetchEnabled = fetchThreads > 0; + this.compressionFormat = compressionFormat; // The deaggregate function is implemented by the amazon-kinesis-client, whose license was formerly not compatible // with Apache. The code here avoids the license issue by using reflection, but is no longer necessary since diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java index 81f8b774f0425..39f540a6dc2c7 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -80,7 +80,8 @@ protected KinesisRecordSupplier createRecordSupplier() tuningConfig.getRecordBufferFullWait(), tuningConfig.getMaxBytesPerPollOrDefault(), ioConfig.isUseEarliestSequenceNumber(), - tuningConfig.isUseListShards() + tuningConfig.isUseListShards(), + ioConfig.getCompressionFormat() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 2f00c8c16cc96..24ddf1f112064 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -145,7 +145,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getEndpoint(), ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), - ioConfig.getAwsExternalId() + ioConfig.getAwsExternalId(), + ioConfig.getCompressionFormat() ); } @@ -202,7 +203,8 @@ protected RecordSupplier setupRecordSupplie taskTuningConfig.getRecordBufferFullWait(), taskTuningConfig.getMaxBytesPerPollOrDefault(), ioConfig.isUseEarliestSequenceNumber(), - spec.getSpec().getTuningConfig().isUseListShards() + spec.getSpec().getTuningConfig().isUseListShards(), + spec.getSpec().getIOConfig().getCompressionFormat() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 9910f22a349bc..f53f13349887d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.utils.CompressionUtils; import org.joda.time.DateTime; import org.joda.time.Period; @@ -54,6 +55,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig private final String awsAssumedRoleArn; private final String awsExternalId; private final boolean deaggregate; + private final CompressionUtils.Format compressionFormat; @JsonCreator public KinesisSupervisorIOConfig( @@ -76,7 +78,8 @@ public KinesisSupervisorIOConfig( @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsExternalId") String awsExternalId, @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, - @JsonProperty("deaggregate") @Deprecated boolean deaggregate + @JsonProperty("deaggregate") @Deprecated boolean deaggregate, + @JsonProperty("compressionFormat") CompressionUtils.Format compressionFormat ) { super( @@ -107,6 +110,7 @@ public KinesisSupervisorIOConfig( this.awsAssumedRoleArn = awsAssumedRoleArn; this.awsExternalId = awsExternalId; this.deaggregate = deaggregate; + this.compressionFormat = compressionFormat; } @JsonProperty @@ -151,6 +155,13 @@ public boolean isDeaggregate() return deaggregate; } + @Nullable + @JsonProperty + public CompressionUtils.Format getCompressionFormat() + { + return compressionFormat; + } + @Override public String toString() { @@ -172,7 +183,8 @@ public String toString() ", fetchDelayMillis=" + fetchDelayMillis + ", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' + ", awsExternalId='" + awsExternalId + '\'' + - ", deaggregate=" + deaggregate + + ", deaggregate=" + deaggregate + '\'' + + ", compressionFormat=" + compressionFormat + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index 3162b2ea0eee8..cb51f98cccd64 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -267,7 +267,8 @@ public void testDeserializeToOldIoConfig() throws IOException "endpoint", 2000, "awsAssumedRoleArn", - "awsExternalId" + "awsExternalId", + null ); final byte[] json = mapper.writeValueAsBytes(currentConfig); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index e84581af60133..dd5c916662786 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -94,6 +94,7 @@ public class KinesisIndexTaskSerdeTest "endpoint", null, null, + null, null ); private static final String ACCESS_KEY = "test-access-key"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 2ef3914840082..ecf991209a38d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -785,6 +785,7 @@ public void testRunWithMinimumMessageTime() throws Exception "awsEndpoint", null, null, + null, null ) ); @@ -847,6 +848,7 @@ public void testRunWithMaximumMessageTime() throws Exception "awsEndpoint", null, null, + null, null ) ); @@ -1946,6 +1948,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception "awsEndpoint", null, null, + null, null ), context @@ -2108,6 +2111,7 @@ public void testSequencesFromContext() throws IOException "awsEndpoint", null, null, + null, null ), context @@ -2309,6 +2313,7 @@ private KinesisIndexTask createTask( "awsEndpoint", null, null, + null, null ), null diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 7c59ad61ac099..b38ea43174f5e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -44,6 +44,8 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.utils.CompressionUtils; +import org.apache.druid.utils.CompressionUtils.Format; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -54,6 +56,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -69,6 +72,22 @@ public class KinesisRecordSupplierTest extends EasyMockSupport { + private static List compressRecords(List baseRecords, CompressionUtils.Format format) + { + return baseRecords.stream() + .map(record -> { + try { + return new Record().withData( + CompressionUtils.compress(record.getData(), format) + ).withSequenceNumber(record.getSequenceNumber()); + } + catch (Exception e) { + throw new RuntimeException("Error compressing record data", e); + } + }) + .collect(Collectors.toList()); + } + private static final String STREAM = "stream"; private static final long POLL_TIMEOUT_MILLIS = 2000; private static final String SHARD_ID1 = "1"; @@ -121,7 +140,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport .collect(Collectors.toList())) .build(); - private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) { try { @@ -216,7 +234,8 @@ public void testSupplierSetup_withoutListShards() 5000, 1_000_000, true, - false + false, + null ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -273,7 +292,8 @@ public void testSupplierSetup_withListShards() 5000, 1_000_000, true, - true + true, + null ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -380,7 +400,8 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -437,7 +458,8 @@ public void testPollWithKinesisNonRetryableFailure() throws InterruptedException 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -512,7 +534,8 @@ public void testSeek() 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -576,7 +599,8 @@ public void testSeekToLatest() 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -609,7 +633,8 @@ public void testSeekUnassigned() throws InterruptedException 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -669,7 +694,8 @@ public void testPollAfterSeek() 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -758,7 +784,8 @@ public void testPollDeaggregate() throws InterruptedException 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -832,7 +859,8 @@ public void getLatestSequenceNumberWhenKinesisRetryableException() 5000, 1_000_000, true, - false + false, + null ); Assert.assertEquals("0", recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0))); @@ -867,7 +895,8 @@ private KinesisRecordSupplier getSequenceNumberWhenNoRecordsHelperForOpenShard() 5000, 1_000_000, true, - false + false, + null ); return recordSupplier; } @@ -954,7 +983,8 @@ public void getPartitionTimeLag() throws InterruptedException 5000, 1_000_000, true, - false + false, + null ); recordSupplier.assign(partitions); @@ -1009,7 +1039,8 @@ public void testIsOffsetAvailable() 5000, 1_000_000, true, - false + false, + null ); StreamPartition partition = new StreamPartition<>(STREAM, SHARD_ID0); @@ -1040,6 +1071,192 @@ public void testIsOffsetAvailable() Assert.assertTrue(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("10"))); } + + @Test + public void testPollDeaggregateWithCompressedData() throws InterruptedException + { + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult1).anyTimes(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) + .andReturn(getRecordsResult0) + .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR))) + .andReturn(getRecordsResult1) + .anyTimes(); + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(compressRecords(SHARD0_RECORDS, Format.ZSTD)).once(); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(compressRecords(SHARD1_RECORDS, Format.ZSTD)).once(); + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + + replayAll(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(STREAM, SHARD_ID0), + StreamPartition.of(STREAM, SHARD_ID1) + ); + + + recordSupplier = new KinesisRecordSupplier( + kinesis, + 0, + 2, + 10_000, + 5000, + 5000, + 1_000_000, + true, + false, + Format.ZSTD + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + recordSupplier.start(); + + for (int i = 0; i < 10 && recordSupplier.bufferSize() < 12; i++) { + Thread.sleep(100); + } + + List> polledRecords = cleanRecords(recordSupplier.poll( + POLL_TIMEOUT_MILLIS)); + + verifyAll(); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); + } + + @Test + public void testPollWithCompressedData() throws Exception + { + // Step 1: Create original data + List originalData = Arrays.asList( + jb("2021-01-01T00:00:00Z", "a", "x", "1", "1.0", "1.0"), + jb("2021-01-01T00:01:00Z", "b", "y", "2", "2.0", "2.0") + ); + + // Step 2: Compress the data using the specified compression format + CompressionUtils.Format compressionFormat = CompressionUtils.Format.ZSTD; + + List compressedData = new ArrayList<>(); + for (ByteBuffer data : originalData) { + ByteBuffer compressed = CompressionUtils.compress(data, compressionFormat); + compressedData.add(compressed); + } + + // Step 3: Create Kinesis Records with the compressed data + List compressedRecords = new ArrayList<>(); + int sequenceNumber = 0; + for (ByteBuffer compressed : compressedData) { + Record record = new Record().withData(compressed).withSequenceNumber(Integer.toString(sequenceNumber++)); + compressedRecords.add(record); + } + + // Step 4: Mock Kinesis client methods to return our compressed records + EasyMock.expect(kinesis.getShardIterator( + EasyMock.eq(STREAM), + EasyMock.eq(SHARD_ID0), + EasyMock.eq(ShardIteratorType.TRIM_HORIZON.toString()), + EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + )).andReturn( + getShardIteratorResult0).anyTimes(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) + .andReturn(getRecordsResult0) + .anyTimes(); + + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(compressedRecords).once(); + + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once(); + + replayAll(); + + // Step 5: Create KinesisRecordSupplier with the compression format set + recordSupplier = new KinesisRecordSupplier( + kinesis, + 0, + 1, + 100, + 5000, + 5000, + 1_000_000, + true, + false, + compressionFormat // Set the compression format + ); + + // Step 6: Assign partition, seek to earliest, and start the supplier + Set> partitions = ImmutableSet.of( + StreamPartition.of(STREAM, SHARD_ID0) + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + recordSupplier.start(); + + // Step 7: Poll the supplier and wait for data to be available + for (int i = 0; i < 10 && recordSupplier.bufferSize() < compressedRecords.size(); i++) { + Thread.sleep(100); + } + Map timeLag = recordSupplier.getPartitionResourcesTimeLag(); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, timeLag); + + List> polledRecords = recordSupplier.poll(POLL_TIMEOUT_MILLIS); + + // Step 8: Verify that the data retrieved matches the original data after decompression + Assert.assertEquals(compressedRecords.size(), polledRecords.size()); + + ObjectMapper objectMapper = new ObjectMapper(); + + for (int i = 0; i < polledRecords.size(); i++) { + OrderedPartitionableRecord record = polledRecords.get(i); + ByteBuffer dataBuffer = record.getData().get(0).getBuffer(); + + // Extract bytes from the ByteBuffer + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes); + + // Deserialize the data to Map + Map dataMap = objectMapper.readValue(dataBytes, Map.class); + + // Similarly, get the expected data from the original uncompressed data + ByteBuffer expectedDataBuffer = originalData.get(i).asReadOnlyBuffer(); + byte[] expectedDataBytes = new byte[expectedDataBuffer.remaining()]; + expectedDataBuffer.get(expectedDataBytes); + Map expectedDataMap = objectMapper.readValue(expectedDataBytes, Map.class); + + // Assert that the decompressed data matches the original data + Assert.assertEquals("Data maps should match after decompression", expectedDataMap, dataMap); + } + + verifyAll(); + } + + private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, List records, String nextIterator) { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 102e2d8929f21..208a6490e6468 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -149,7 +149,8 @@ public void testSample() throws InterruptedException null, null, null, - false + false, + null ), null, null, @@ -228,7 +229,8 @@ public void testSampleWithInputRowParser() throws IOException, InterruptedExcept null, null, null, - false + false, + null ), null, null, @@ -281,7 +283,8 @@ public void testGetInputSourceResources() null, null, null, - false + false, + null ), null, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 24d919918f475..83894f7d770c7 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -460,7 +460,8 @@ public void testRecordSupplier() null, null, null, - false + false, + null ); KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); KinesisSupervisor supervisor = new KinesisSupervisor( @@ -524,7 +525,8 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() null, null, null, - false + false, + null ); AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig(); @@ -551,7 +553,8 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() null, null, OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), - false + false, + null ); AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig(); @@ -4178,7 +4181,8 @@ public void testCorrectInputSources() null, null, null, - false + false, + null ), null, null, @@ -5103,7 +5107,8 @@ private TestableKinesisSupervisor getTestableSupervisor( null, null, null, - false + false, + null ); KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( @@ -5245,7 +5250,8 @@ private TestableKinesisSupervisor getTestableSupervisor( null, null, autoScalerConfig, - false + false, + null ); KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( @@ -5334,7 +5340,8 @@ private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, null, - false + false, + null ); KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( @@ -5422,7 +5429,8 @@ private KinesisSupervisor createSupervisor( null, null, null, - false + false, + null ); KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( @@ -5567,6 +5575,7 @@ private KinesisIndexTask createKinesisIndexTask( "awsEndpoint", null, null, + null, null ), Collections.emptyMap(), diff --git a/processing/pom.xml b/processing/pom.xml index 7ae4cc075a207..8756dee29aea3 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -471,6 +471,11 @@ system-rules test + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 7ab05b9071966..eb283038f3064 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -19,6 +19,8 @@ package org.apache.druid.utils; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -28,10 +30,17 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream; +import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorOutputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream; +import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; import org.apache.commons.compress.utils.FileNameUtils; +import org.apache.commons.io.IOUtils; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -39,17 +48,20 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StreamUtils; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.NativeIO; import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; @@ -109,6 +121,19 @@ public String getExtension() return extension; } + @JsonValue + @Override + public String toString() + { + return StringUtils.toLowerCase(this.name()); + } + + @JsonCreator + public static Format fromString(String name) + { + return valueOf(StringUtils.toUpperCase(name)); + } + @Nullable public static Format fromFileName(@Nullable String fileName) { @@ -614,6 +639,63 @@ public static String getGzBaseName(String fname) throw new IAE("[%s] is not a valid gz file name", fname); } + /** + * Compress an output stream based on a given format specification. + * This uses default settings for each algorithm. + */ + public static OutputStream compress(final OutputStream out, final Format format) throws IOException + { + switch (format) { + case GZ: return new GzipCompressorOutputStream(out); + case BZ2: return new BZip2CompressorOutputStream(out); + case XZ: return new XZCompressorOutputStream(out); + case SNAPPY: return new FramedSnappyCompressorOutputStream(out); + case ZSTD: return new ZstdCompressorOutputStream(out); + case ZIP: return new ZipOutputStream(out, StandardCharsets.UTF_8); + default: return out; + } + } + + /** + * Compress a ByteBuffer based on a given format specification. + * This uses default settings for each algorithm. + */ + public static ByteBuffer compress(ByteBuffer in, Format format) throws IOException + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (OutputStream compressedStream = compress(baos, format)) { + compressedStream.write(in.array()); + } + return ByteBuffer.wrap(baos.toByteArray()); + } + + /** + * Decompress an input stream based on a given format specification. + */ + public static InputStream decompress(final InputStream in, final Format format) throws IOException + { + switch (format) { + case GZ: return gzipInputStream(in); + case BZ2: return new BZip2CompressorInputStream(in, true); + case XZ: return new XZCompressorInputStream(in, true); + case SNAPPY: return new FramedSnappyCompressorInputStream(in); + case ZSTD: return new ZstdCompressorInputStream(in); + case ZIP: return new ZipInputStream(in, StandardCharsets.UTF_8); + default: return in; + } + } + + /** + * Decompress a ByteBuffer based on a given format specification. + */ + public static ByteBuffer decompress(ByteBuffer in, Format format) throws IOException + { + ByteArrayInputStream bais = new ByteArrayInputStream(in.array()); + try (InputStream decompressedStream = decompress(bais, format)) { + return ByteBuffer.wrap(IOUtils.toByteArray(decompressedStream)); + } + } + /** * Decompress an input stream from a file, based on the filename. */ diff --git a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java index f02e441573a34..0d72bd4e557c2 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java @@ -31,10 +31,13 @@ import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; import org.apache.druid.utils.CompressionUtils; +import org.apache.druid.utils.CompressionUtils.Format; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; @@ -892,4 +895,99 @@ public static void makeEvilZip(File outputFile) throws IOException zipOutputStream.closeEntry(); zipOutputStream.close(); } + + + // Helper method for copying Input to Output + private void copyStream(InputStream input, OutputStream output) throws IOException + { + byte[] buffer = new byte[8192]; + int length; + while ((length = input.read(buffer)) != -1) { + output.write(buffer, 0, length); + } + } + + // Helper method for InputStream conversion + private String readStream(InputStream input) throws IOException + { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + copyStream(input, output); + return output.toString(StandardCharsets.UTF_8.name()); + } + + // Helper method for compressing a string + private byte[] compressString(String input, Format format) throws IOException + { + InputStream inputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); + // Compress + ByteArrayOutputStream compressedOutput = new ByteArrayOutputStream(); + try (OutputStream compressedStream = CompressionUtils.compress(compressedOutput, format)) { + copyStream(inputStream, compressedStream); + } + return compressedOutput.toByteArray(); + } + + // Helper method for decompressing a string + private String decompressString(byte[] input, Format format) throws IOException + { + InputStream compressedInput = new ByteArrayInputStream(input); + InputStream decompressedStream = CompressionUtils.decompress(compressedInput, format); + return readStream(decompressedStream); + } + + /** + * Dynamic round-trip compression/decompression test for each format. + */ + @ParameterizedTest + @EnumSource(CompressionUtils.Format.class) + void testCompressAndDecompressForAllFormats(CompressionUtils.Format format) throws IOException + { + String originalString = "This is a test string for compression in format: " + format; + + // Compress and then decompress + byte[] compressedData = compressString(originalString, format); + String decompressedString = decompressString(compressedData, format); + + // Verify the original and decompressed strings match + Assert.assertEquals(decompressedString, "Decompressed string does not match the original for format: " + format, originalString); + } + + /** + * Dynamic chaos test for mismatched compression and decompression formats. + * This attempts to decompress data compressed in one format using a different format. + */ + @ParameterizedTest + @EnumSource(CompressionUtils.Format.class) + void testChaosCompressionDecompression(CompressionUtils.Format format) throws IOException + { + String originalString = "This is a chaos test string for compression in format: " + format; + + // Compress with the selected format + byte[] compressedData = compressString(originalString, format); + + // Try decompressing with a different format + for (CompressionUtils.Format wrongFormat : CompressionUtils.Format.values()) { + if (wrongFormat != format) { + try { + decompressString(compressedData, wrongFormat); + Assert.fail("Expected decompression failure for mismatched formats. Compressed with: " + format + ", decompressed with: " + wrongFormat); + } + catch (IOException e) { + // Expected exception, continue + Assert.assertNotNull(e.getMessage(), "Exception message should exist on decompression failure."); + } + } + } + } + + /** + * Test for compressing with an unsupported or invalid input. + */ + @Test + void testCompressWithInvalidInput() + { + CompressionUtils.Format format = CompressionUtils.Format.GZ; + // Attempting to compress a null input should throw an exception + Assert.assertThrows("Expected NullPointerException for null input", NullPointerException.class, () -> compressString(null, format)); + } }