Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize S3 storage writing for MSQ durable storage #16481

Merged
merged 34 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
38f81eb
Optimise S3 storage writing for MSQ durable storage
Akshat-Jain May 17, 2024
bf518d8
Get rid of static ConcurrentHashMap
Akshat-Jain May 22, 2024
a920f74
Fix static checks
Akshat-Jain May 22, 2024
38dc51c
Fix tests
Akshat-Jain May 22, 2024
934ecd1
Remove unused constructor parameter chunkValidation + relevant cleanup
Akshat-Jain May 22, 2024
82fa878
Assert etags as String instead of Integer
Akshat-Jain May 22, 2024
bc55f0a
Fix flaky test
Akshat-Jain May 22, 2024
438008e
Inject executor service
Akshat-Jain May 27, 2024
f5a1514
Make threadpool size dynamic based on number of cores
Akshat-Jain May 27, 2024
c266a9e
Fix S3StorageDruidModuleTest
Akshat-Jain May 27, 2024
ce60f7e
Fix S3StorageConnectorProviderTest
Akshat-Jain May 27, 2024
c382daf
Fix injection issues
Akshat-Jain May 27, 2024
2224d11
Add S3UploadConfig to manage maximum number of concurrent chunks dyna…
Akshat-Jain May 28, 2024
262182c
Address the minor review comments
Akshat-Jain May 28, 2024
6d62bf9
Refactor S3UploadConfig + ExecutorService into S3UploadManager
Akshat-Jain May 28, 2024
a705294
Address review comments
Akshat-Jain May 28, 2024
1c0bfe5
Make updateChunkSizeIfGreater() synchronized instead of recomputeMaxC…
Akshat-Jain May 28, 2024
7418790
Address the minor review comments
Akshat-Jain May 29, 2024
fbe71e3
Fix intellij-inspections check
Akshat-Jain May 29, 2024
5c83514
Refactor code to use futures for maxNumConcurrentChunks. Also use exe…
Akshat-Jain May 31, 2024
8d6442b
Update javadoc
Akshat-Jain May 31, 2024
295689f
Get rid of cyclic dependency injection between S3UploadManager and S3…
Akshat-Jain Jun 3, 2024
6ec023f
Fix RetryableS3OutputStreamTest
Akshat-Jain Jun 3, 2024
f43a0c9
Remove unnecessary synchronization parts from RetryableS3OutputStream
Akshat-Jain Jun 3, 2024
79a22ed
Update javadoc
Akshat-Jain Jun 3, 2024
da5e57a
Add S3UploadManagerTest
Akshat-Jain Jun 4, 2024
059a5e9
Revert back to S3StorageConnectorProvider extends S3OutputConfig
Akshat-Jain Jun 4, 2024
70c7352
Address Karan's review comments
Akshat-Jain Jun 5, 2024
749b20d
Address Kashif's review comments
Akshat-Jain Jun 6, 2024
258376e
Change a log message to debug
Akshat-Jain Jun 6, 2024
8e7882e
Address review comments
Akshat-Jain Jun 6, 2024
f3df9c7
Fix intellij-inspections check
Akshat-Jain Jun 6, 2024
a474008
Fix checkstyle
Akshat-Jain Jun 6, 2024
8ef6086
Merge branch 'master' into s3-output-stream
asdf2014 Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,22 @@ public void createAzureStorageFactoryWithRequiredProperties()
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties);
StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties);

assertInstanceOf(AzureStorageConnectorProvider.class, s3StorageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, s3StorageConnectorProvider.get());
assertEquals("container", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getPrefix());
assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get());
assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix());
assertEquals(new File("/tmp"),
((AzureStorageConnectorProvider) s3StorageConnectorProvider).getTempDir());

((AzureStorageConnectorProvider) storageConnectorProvider).getTempDir());
}

@Test
public void createAzureStorageFactoryWithMissingPrefix()
{

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@

package org.apache.druid.storage.s3.output;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.io.CountingOutputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
Expand All @@ -49,6 +45,7 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -81,20 +78,13 @@ public class RetryableS3OutputStream extends OutputStream
private final File chunkStorePath;
private final long chunkSize;

private final List<PartETag> pushResults = new ArrayList<>();
private final byte[] singularBuffer = new byte[1];

// metric
private final Stopwatch pushStopwatch;

private Chunk currentChunk;
private int nextChunkId = 1; // multipart upload requires partNumber to be in the range between 1 and 10000
private int numChunksPushed;
/**
* Total size of all chunks. This size is updated whenever the chunk is ready for push,
* not when {@link #write(byte[], int, int)} is called.
*/
private long resultsSize;

/**
* A flag indicating whether there was an upload error.
Expand All @@ -103,27 +93,28 @@ public class RetryableS3OutputStream extends OutputStream
private boolean error;
private boolean closed;

public RetryableS3OutputStream(
S3OutputConfig config,
ServerSideEncryptingAmazonS3 s3,
String s3Key
) throws IOException
{
/**
* Helper class for calculating maximum number of simultaneous chunks allowed on local disk.
*/
private final S3UploadManager uploadManager;

this(config, s3, s3Key, true);
}
/**
* A list of futures to allow us to wait for completion of all uploadPart() calls
* before hitting {@link ServerSideEncryptingAmazonS3#completeMultipartUpload(CompleteMultipartUploadRequest)}.
*/
private final List<Future<UploadPartResult>> futures = new ArrayList<>();
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved

@VisibleForTesting
protected RetryableS3OutputStream(
public RetryableS3OutputStream(
S3OutputConfig config,
ServerSideEncryptingAmazonS3 s3,
String s3Key,
boolean chunkValidation
S3UploadManager uploadManager
) throws IOException
{
this.config = config;
this.s3 = s3;
this.s3Key = s3Key;
this.uploadManager = uploadManager;

final InitiateMultipartUploadResult result;
try {
Expand All @@ -138,9 +129,7 @@ protected RetryableS3OutputStream(
this.chunkStorePath = new File(config.getTempDir(), uploadId + UUID.randomUUID());
FileUtils.mkdirp(this.chunkStorePath);
this.chunkSize = config.getChunkSize();
this.pushStopwatch = Stopwatch.createUnstarted();
this.pushStopwatch.reset();

this.pushStopwatch = Stopwatch.createStarted();
this.currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++)));
}

Expand Down Expand Up @@ -172,7 +161,6 @@ public void write(byte[] b, int off, int len) throws IOException

while (remainingBytesToWrite > 0) {
final int writtenBytes = writeToCurrentChunk(b, offsetToWrite, remainingBytesToWrite);

if (currentChunk.length() >= chunkSize) {
pushCurrentChunk();
currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++)));
Expand All @@ -199,62 +187,11 @@ private void pushCurrentChunk() throws IOException
{
currentChunk.close();
final Chunk chunk = currentChunk;
try {
if (chunk.length() > 0) {
resultsSize += chunk.length();

pushStopwatch.start();
pushResults.add(push(chunk));
pushStopwatch.stop();
numChunksPushed++;
}
}
finally {
if (!chunk.delete()) {
LOG.warn("Failed to delete chunk [%s]", chunk.getAbsolutePath());
}
}
}

private PartETag push(Chunk chunk) throws IOException
{
try {
return RetryUtils.retry(
() -> uploadPartIfPossible(uploadId, config.getBucket(), s3Key, chunk),
S3Utils.S3RETRY,
config.getMaxRetry()
if (chunk.length() > 0) {
futures.add(
uploadManager.queueChunkForUpload(s3, s3Key, chunk.id, chunk.file, uploadId, config)
);
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private PartETag uploadPartIfPossible(
String uploadId,
String bucket,
String key,
Chunk chunk
)
{
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(resultsSize);
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withUploadId(uploadId)
.withBucketName(bucket)
.withKey(key)
.withFile(chunk.file)
.withPartNumber(chunk.id)
.withPartSize(chunk.length());

if (LOG.isDebugEnabled()) {
LOG.debug("Pushing chunk [%s] to bucket[%s] and key[%s].", chunk, bucket, key);
}
UploadPartResult uploadResult = s3.uploadPart(uploadPartRequest);
return uploadResult.getPartETag();
}

@Override
Expand All @@ -268,53 +205,68 @@ public void close() throws IOException

// Closeables are closed in LIFO order
closer.register(() -> {
org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);

// This should be emitted as a metric
long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length();
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
LOG.info(
"Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
numChunksPushed,
resultsSize,
futures.size(),
totalChunkSize,
pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
);
});

closer.register(() -> org.apache.commons.io.FileUtils.forceDelete(chunkStorePath));

closer.register(() -> {
try {
if (resultsSize > 0 && isAllPushSucceeded()) {
RetryUtils.retry(
() -> s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults)
),
S3Utils.S3RETRY,
config.getMaxRetry()
);
} else {
RetryUtils.retry(
() -> {
s3.cancelMultiPartUpload(new AbortMultipartUploadRequest(config.getBucket(), s3Key, uploadId));
return null;
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
}
}
catch (Exception e) {
throw new IOException(e);
}
});

try (Closer ignored = closer) {
if (!error) {
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
pushCurrentChunk();
completeMultipartUpload();
}
}
}

private boolean isAllPushSucceeded()
private void completeMultipartUpload()
{
return !error && !pushResults.isEmpty() && numChunksPushed == pushResults.size();
final List<PartETag> pushResults = new ArrayList<>();
for (Future<UploadPartResult> future : futures) {
if (error) {
future.cancel(true);
}
try {
UploadPartResult result = future.get(1, TimeUnit.HOURS);
pushResults.add(result.getPartETag());
}
catch (Exception e) {
error = true;
LOG.error(e, "Error in uploading part for upload ID [%s]", uploadId);
}
}

try {
boolean isAllPushSucceeded = !error && !pushResults.isEmpty() && futures.size() == pushResults.size();
if (isAllPushSucceeded) {
RetryUtils.retry(
() -> s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults)
),
S3Utils.S3RETRY,
config.getMaxRetry()
);
} else {
RetryUtils.retry(
() -> {
s3.cancelMultiPartUpload(new AbortMultipartUploadRequest(config.getBucket(), s3Key, uploadId));
return null;
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private static class Chunk implements Closeable
Expand All @@ -336,16 +288,6 @@ private long length()
return outputStream.getCount();
}

private boolean delete()
{
return file.delete();
}

private String getAbsolutePath()
{
return file.getAbsolutePath();
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class S3ExportStorageProvider implements ExportStorageProvider
@JacksonInject
ServerSideEncryptingAmazonS3 s3;

@JacksonInject
S3UploadManager s3UploadManager;

@JsonCreator
public S3ExportStorageProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
Expand Down Expand Up @@ -90,7 +93,7 @@ public StorageConnector get()
s3ExportConfig.getChunkSize(),
s3ExportConfig.getMaxRetry()
);
return new S3StorageConnector(s3OutputConfig, s3);
return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ public class S3StorageConnector extends ChunkingStorageConnector<GetObjectReques

private final S3OutputConfig config;
private final ServerSideEncryptingAmazonS3 s3Client;
private final S3UploadManager s3UploadManager;

private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
private static final int MAX_NUMBER_OF_LISTINGS = 1000;

public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3, S3UploadManager s3UploadManager)
{
this.config = config;
this.s3Client = serverSideEncryptingAmazonS3;
this.s3UploadManager = s3UploadManager;
Preconditions.checkNotNull(config, "config is null");
Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config");
try {
Expand Down Expand Up @@ -153,7 +155,7 @@ public InputStream open(GetObjectRequest object, long offset)
@Override
public OutputStream write(String path) throws IOException
{
return new RetryableS3OutputStream(config, s3Client, objectPath(path));
return new RetryableS3OutputStream(config, s3Client, objectPath(path), s3UploadManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ public List<? extends Module> getJacksonModules()
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.export.storage.s3", S3ExportConfig.class);
JsonConfigProvider.bind(binder, "druid.msq.intermediate.storage", S3OutputConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag
@JacksonInject
ServerSideEncryptingAmazonS3 s3;

@JacksonInject
S3UploadManager s3UploadManager;

@JsonCreator
public S3StorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
Expand All @@ -53,6 +56,6 @@ public S3StorageConnectorProvider(
@Override
public StorageConnector get()
{
return new S3StorageConnector(this, s3);
return new S3StorageConnector(this, s3, s3UploadManager);
}
}
Loading
Loading