From bdac52937a46a065f6adc8ca8b156d1615ca5ccf Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 29 May 2024 10:12:56 +0530 Subject: [PATCH] Address the minor review comments --- .../storage/s3/S3StorageDruidModule.java | 16 -------- .../s3/output/RetryableS3OutputStream.java | 6 +-- .../storage/s3/output/S3UploadManager.java | 40 ++++++++++--------- .../s3/S3StorageConnectorProviderTest.java | 9 +++-- .../output/RetryableS3OutputStreamTest.java | 10 ++++- .../s3/output/S3StorageConnectorTest.java | 5 ++- .../s3/output/S3UploadManagerTest.java | 23 +++++++++-- 7 files changed, 61 insertions(+), 48 deletions(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index 11c16708b3318..3747088aeb6ef 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -43,13 +43,9 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.storage.s3.output.S3UploadManager; -import org.apache.druid.utils.RuntimeInfo; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; /** * @@ -186,16 +182,4 @@ public Supplier getAmazonS3ClientSupplier( { return Suppliers.memoize(serverSideEncryptingAmazonS3Builder::build); } - - @Provides - @LazySingleton - public S3UploadManager getS3UploadManager(ScheduledExecutorFactory scheduledExecutorFactory, RuntimeInfo runtimeInfo) - { - int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors()); - ScheduledExecutorService executorService = scheduledExecutorFactory.create( - poolSize, - "UploadThreadPool-%d" - ); - return new S3UploadManager(executorService); - } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java index 13591c3657433..40a3a0858bc52 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java @@ -177,7 +177,7 @@ public void write(byte[] b, int off, int len) throws IOException } synchronized (maxChunksLock) { - while (uploadManager.getCurrentNumChunks() > uploadManager.getMaxConcurrentNumChunks()) { + while (uploadManager.getCurrentNumChunksOnDisk() > uploadManager.getMaxConcurrentNumChunks()) { try { LOG.debug("Waiting for lock for writing further chunks to local disk."); maxChunksLock.wait(); @@ -222,7 +222,7 @@ private void pushCurrentChunk() throws IOException currentChunk.close(); final Chunk chunk = currentChunk; if (chunk.length() > 0) { - uploadManager.incrementCurrentNumChunks(); + uploadManager.incrementCurrentNumChunksOnDisk(); pendingFiles.incrementAndGet(); uploadManager.submitTask(() -> { @@ -236,7 +236,7 @@ private void pushCurrentChunk() throws IOException } finally { synchronized (maxChunksLock) { - uploadManager.decrementCurrentNumChunks(); + uploadManager.decrementCurrentNumChunksOnDisk(); maxChunksLock.notifyAll(); } if (pendingFiles.decrementAndGet() == 0) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java index 1b58a650bc02f..a0f936364cdd8 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java @@ -19,16 +19,19 @@ package org.apache.druid.storage.s3.output; +import com.google.inject.Inject; import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.RuntimeInfo; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; /** * This class manages the configuration for uploading files to S3 in chunks. - * It tracks the current number of chunks written to local disk concurrently and ensures that the - * maximum number of chunks on disk does not exceed a specified limit at any given point in time. + * It tracks the number of chunks currently present on local disk and ensures that + * it does not exceed a specified limit. */ public class S3UploadManager { @@ -38,10 +41,7 @@ public class S3UploadManager */ private long chunkSize = new HumanReadableBytes("5MiB").getBytes(); - /** - * An atomic counter to track the current number of chunks saved to local disk. - */ - private final AtomicInteger currentNumChunks = new AtomicInteger(0); + private final AtomicInteger currentNumChunksOnDisk = new AtomicInteger(0); /** * The maximum number of chunks that can be saved to local disk concurrently. @@ -49,32 +49,34 @@ public class S3UploadManager */ private int maxConcurrentNumChunks = 100; - /** - * Threadpool used for uploading the chunks asynchronously. - */ - private final ExecutorService uploadExecutor; + private final ScheduledExecutorService uploadExecutor; private static final Logger log = new Logger(S3UploadManager.class); - public S3UploadManager(ExecutorService executorService) + @Inject + public S3UploadManager(ScheduledExecutorFactory scheduledExecutorFactory, RuntimeInfo runtimeInfo) { - this.uploadExecutor = executorService; + int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors()); + this.uploadExecutor = scheduledExecutorFactory.create( + poolSize, + "UploadThreadPool-%d" + ); } /** * Increments the counter for the current number of chunks saved on disk. */ - public void incrementCurrentNumChunks() + public void incrementCurrentNumChunksOnDisk() { - currentNumChunks.incrementAndGet(); + currentNumChunksOnDisk.incrementAndGet(); } /** * Decrements the counter for the current number of chunks saved on disk. */ - public void decrementCurrentNumChunks() + public void decrementCurrentNumChunksOnDisk() { - currentNumChunks.decrementAndGet(); + currentNumChunksOnDisk.decrementAndGet(); } /** @@ -82,9 +84,9 @@ public void decrementCurrentNumChunks() * * @return the current number of chunks saved to local disk. */ - public int getCurrentNumChunks() + public int getCurrentNumChunksOnDisk() { - return currentNumChunks.get(); + return currentNumChunksOnDisk.get(); } /** diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index 95ac547c78ba1..7c5619272afcd 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -31,7 +31,8 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; @@ -39,6 +40,7 @@ import org.apache.druid.storage.s3.output.S3StorageConnectorModule; import org.apache.druid.storage.s3.output.S3StorageConnectorProvider; import org.apache.druid.storage.s3.output.S3UploadManager; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -151,8 +153,9 @@ public void configure(Binder binder) ) .addValue( S3UploadManager.class, - new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d")) - )); + new S3UploadManager(EasyMock.mock(ScheduledExecutorFactory.class), new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)) + ) + ); StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get( diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 53fe58ae41241..6f24b46e7308d 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -34,6 +34,8 @@ import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.easymock.EasyMock; @@ -67,7 +69,7 @@ public class RetryableS3OutputStreamTest private S3OutputConfig config; private long chunkSize; - private final S3UploadManager s3UploadManager = new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d")); + private S3UploadManager s3UploadManager; @Before public void setup() throws IOException @@ -101,6 +103,12 @@ public int getMaxRetry() return 2; } }; + + ScheduledExecutorFactory executorFactory = EasyMock.mock(ScheduledExecutorFactory.class); + EasyMock.expect(executorFactory.create(EasyMock.anyInt(), EasyMock.anyString())) + .andReturn(Execs.scheduledSingleThreaded("UploadThreadPool-%d")); + EasyMock.replay(executorFactory); + s3UploadManager = new S3UploadManager(executorFactory, new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)); } @Test diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java index 2810494745da3..da6d7702542be 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java @@ -31,7 +31,8 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -85,7 +86,7 @@ public void setup() null, null, true - ), service, new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d"))); + ), service, new S3UploadManager(EasyMock.mock(ScheduledExecutorFactory.class), new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))); } catch (IOException e) { throw new RuntimeException(e); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java index 583885235a950..bdaf0f144e5eb 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java @@ -20,19 +20,34 @@ package org.apache.druid.storage.s3.output; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.easymock.EasyMock; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.io.IOException; + public class S3UploadManagerTest { + private S3UploadManager s3UploadManager; - S3UploadManager s3UploadManager = new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d")); + @Before + public void setup() throws IOException + { + ScheduledExecutorFactory executorFactory = EasyMock.mock(ScheduledExecutorFactory.class); + EasyMock.expect(executorFactory.create(EasyMock.anyInt(), EasyMock.anyString())) + .andReturn(Execs.scheduledSingleThreaded("UploadThreadPool-%d")); + EasyMock.replay(executorFactory); + s3UploadManager = new S3UploadManager(executorFactory, new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)); + } @Test public void testDefault() { Assert.assertEquals(100, s3UploadManager.getMaxConcurrentNumChunks()); - Assert.assertEquals(0, s3UploadManager.getCurrentNumChunks()); + Assert.assertEquals(0, s3UploadManager.getCurrentNumChunksOnDisk()); } @Test @@ -41,11 +56,11 @@ public void testUpdateChunkSize() // Verify that updating chunk size to a smaller or equal value doesn't change the maxConcurrentNumChunks. s3UploadManager.updateChunkSizeIfGreater(5L * 1024 * 1024L); Assert.assertEquals(100, s3UploadManager.getMaxConcurrentNumChunks()); - Assert.assertEquals(0, s3UploadManager.getCurrentNumChunks()); + Assert.assertEquals(0, s3UploadManager.getCurrentNumChunksOnDisk()); // Verify that updating chunk size to a greater value changes the maxConcurrentNumChunks. s3UploadManager.updateChunkSizeIfGreater(5L * 1024 * 1024 * 1024L); Assert.assertEquals(1, s3UploadManager.getMaxConcurrentNumChunks()); - Assert.assertEquals(0, s3UploadManager.getCurrentNumChunks()); + Assert.assertEquals(0, s3UploadManager.getCurrentNumChunksOnDisk()); } }