Skip to content

Commit

Permalink
Address the minor review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshat-Jain committed May 29, 2024
1 parent 26784c1 commit bdac529
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.s3.output.S3UploadManager;
import org.apache.druid.utils.RuntimeInfo;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

/**
*
Expand Down Expand Up @@ -186,16 +182,4 @@ public Supplier<ServerSideEncryptingAmazonS3> getAmazonS3ClientSupplier(
{
return Suppliers.memoize(serverSideEncryptingAmazonS3Builder::build);
}

@Provides
@LazySingleton
public S3UploadManager getS3UploadManager(ScheduledExecutorFactory scheduledExecutorFactory, RuntimeInfo runtimeInfo)
{
int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
ScheduledExecutorService executorService = scheduledExecutorFactory.create(
poolSize,
"UploadThreadPool-%d"
);
return new S3UploadManager(executorService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void write(byte[] b, int off, int len) throws IOException
}

synchronized (maxChunksLock) {
while (uploadManager.getCurrentNumChunks() > uploadManager.getMaxConcurrentNumChunks()) {
while (uploadManager.getCurrentNumChunksOnDisk() > uploadManager.getMaxConcurrentNumChunks()) {
try {
LOG.debug("Waiting for lock for writing further chunks to local disk.");
maxChunksLock.wait();
Expand Down Expand Up @@ -222,7 +222,7 @@ private void pushCurrentChunk() throws IOException
currentChunk.close();
final Chunk chunk = currentChunk;
if (chunk.length() > 0) {
uploadManager.incrementCurrentNumChunks();
uploadManager.incrementCurrentNumChunksOnDisk();
pendingFiles.incrementAndGet();

uploadManager.submitTask(() -> {
Expand All @@ -236,7 +236,7 @@ private void pushCurrentChunk() throws IOException
}
finally {
synchronized (maxChunksLock) {
uploadManager.decrementCurrentNumChunks();
uploadManager.decrementCurrentNumChunksOnDisk();
maxChunksLock.notifyAll();
}
if (pendingFiles.decrementAndGet() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@

package org.apache.druid.storage.s3.output;

import com.google.inject.Inject;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.RuntimeInfo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
* This class manages the configuration for uploading files to S3 in chunks.
* It tracks the current number of chunks written to local disk concurrently and ensures that the
* maximum number of chunks on disk does not exceed a specified limit at any given point in time.
* It tracks the number of chunks currently present on local disk and ensures that
* it does not exceed a specified limit.
*/
public class S3UploadManager
{
Expand All @@ -38,53 +41,52 @@ public class S3UploadManager
*/
private long chunkSize = new HumanReadableBytes("5MiB").getBytes();

/**
* An atomic counter to track the current number of chunks saved to local disk.
*/
private final AtomicInteger currentNumChunks = new AtomicInteger(0);
private final AtomicInteger currentNumChunksOnDisk = new AtomicInteger(0);

/**
* The maximum number of chunks that can be saved to local disk concurrently.
* This value is recalculated when the chunk size is updated in {@link #updateChunkSizeIfGreater(long)}.
*/
private int maxConcurrentNumChunks = 100;

/**
* Threadpool used for uploading the chunks asynchronously.
*/
private final ExecutorService uploadExecutor;
private final ScheduledExecutorService uploadExecutor;

private static final Logger log = new Logger(S3UploadManager.class);

public S3UploadManager(ExecutorService executorService)
@Inject
public S3UploadManager(ScheduledExecutorFactory scheduledExecutorFactory, RuntimeInfo runtimeInfo)
{
this.uploadExecutor = executorService;
int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
this.uploadExecutor = scheduledExecutorFactory.create(
poolSize,
"UploadThreadPool-%d"
);
}

/**
* Increments the counter for the current number of chunks saved on disk.
*/
public void incrementCurrentNumChunks()
public void incrementCurrentNumChunksOnDisk()
{
currentNumChunks.incrementAndGet();
currentNumChunksOnDisk.incrementAndGet();
}

/**
* Decrements the counter for the current number of chunks saved on disk.
*/
public void decrementCurrentNumChunks()
public void decrementCurrentNumChunksOnDisk()
{
currentNumChunks.decrementAndGet();
currentNumChunksOnDisk.decrementAndGet();
}

/**
* Gets the current number of chunks saved to local disk.
*
* @return the current number of chunks saved to local disk.
*/
public int getCurrentNumChunks()
public int getCurrentNumChunksOnDisk()
{
return currentNumChunks.get();
return currentNumChunksOnDisk.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.s3.output.S3StorageConnector;
import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
import org.apache.druid.storage.s3.output.S3StorageConnectorProvider;
import org.apache.druid.storage.s3.output.S3UploadManager;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -151,8 +153,9 @@ public void configure(Binder binder)
)
.addValue(
S3UploadManager.class,
new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d"))
));
new S3UploadManager(EasyMock.mock(ScheduledExecutorFactory.class), new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
)
);


StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -67,7 +69,7 @@ public class RetryableS3OutputStreamTest
private S3OutputConfig config;
private long chunkSize;

private final S3UploadManager s3UploadManager = new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d"));
private S3UploadManager s3UploadManager;

@Before
public void setup() throws IOException
Expand Down Expand Up @@ -101,6 +103,12 @@ public int getMaxRetry()
return 2;
}
};

ScheduledExecutorFactory executorFactory = EasyMock.mock(ScheduledExecutorFactory.class);
EasyMock.expect(executorFactory.create(EasyMock.anyInt(), EasyMock.anyString()))
.andReturn(Execs.scheduledSingleThreaded("UploadThreadPool-%d"));
EasyMock.replay(executorFactory);
s3UploadManager = new S3UploadManager(executorFactory, new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
Expand Down Expand Up @@ -85,7 +86,7 @@ public void setup()
null,
null,
true
), service, new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d")));
), service, new S3UploadManager(EasyMock.mock(ScheduledExecutorFactory.class), new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)));
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,34 @@
package org.apache.druid.storage.s3.output;

import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class S3UploadManagerTest
{
private S3UploadManager s3UploadManager;

S3UploadManager s3UploadManager = new S3UploadManager(Execs.singleThreaded("UploadThreadPool-%d"));
@Before
public void setup() throws IOException
{
ScheduledExecutorFactory executorFactory = EasyMock.mock(ScheduledExecutorFactory.class);
EasyMock.expect(executorFactory.create(EasyMock.anyInt(), EasyMock.anyString()))
.andReturn(Execs.scheduledSingleThreaded("UploadThreadPool-%d"));
EasyMock.replay(executorFactory);
s3UploadManager = new S3UploadManager(executorFactory, new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
}

@Test
public void testDefault()
{
Assert.assertEquals(100, s3UploadManager.getMaxConcurrentNumChunks());
Assert.assertEquals(0, s3UploadManager.getCurrentNumChunks());
Assert.assertEquals(0, s3UploadManager.getCurrentNumChunksOnDisk());
}

@Test
Expand All @@ -41,11 +56,11 @@ public void testUpdateChunkSize()
// Verify that updating chunk size to a smaller or equal value doesn't change the maxConcurrentNumChunks.
s3UploadManager.updateChunkSizeIfGreater(5L * 1024 * 1024L);
Assert.assertEquals(100, s3UploadManager.getMaxConcurrentNumChunks());
Assert.assertEquals(0, s3UploadManager.getCurrentNumChunks());
Assert.assertEquals(0, s3UploadManager.getCurrentNumChunksOnDisk());

// Verify that updating chunk size to a greater value changes the maxConcurrentNumChunks.
s3UploadManager.updateChunkSizeIfGreater(5L * 1024 * 1024 * 1024L);
Assert.assertEquals(1, s3UploadManager.getMaxConcurrentNumChunks());
Assert.assertEquals(0, s3UploadManager.getCurrentNumChunks());
Assert.assertEquals(0, s3UploadManager.getCurrentNumChunksOnDisk());
}
}

0 comments on commit bdac529

Please sign in to comment.