Skip to content

Commit

Permalink
Revert back to S3StorageConnectorProvider extends S3OutputConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshat-Jain committed Jun 4, 2024
1 parent da5e57a commit 059a5e9
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,41 @@


import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;

import java.io.File;

@JsonTypeName(S3StorageDruidModule.SCHEME)
public class S3StorageConnectorProvider implements StorageConnectorProvider
public class S3StorageConnectorProvider extends S3OutputConfig implements StorageConnectorProvider
{
@JacksonInject
ServerSideEncryptingAmazonS3 s3;

@JacksonInject
S3OutputConfig s3OutputConfig;

@JacksonInject
S3UploadManager s3UploadManager;

@JsonCreator
public S3StorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty("chunkSize") HumanReadableBytes chunkSize,
@JsonProperty("maxRetry") Integer maxRetry
)
{
super(bucket, prefix, tempDir, chunkSize, maxRetry);
}

@Override
public StorageConnector get()
{
return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager);
return new S3StorageConnector(this, s3, s3UploadManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.ProvisionException;
import com.google.inject.name.Names;
import org.apache.druid.common.aws.AWSModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.s3.output.S3ExportConfig;
import org.apache.druid.storage.s3.output.S3OutputConfig;
import org.apache.druid.storage.s3.output.S3StorageConnector;
import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
Expand All @@ -52,16 +57,71 @@ public class S3StorageConnectorProviderTest
@Test
public void createS3StorageFactoryWithRequiredProperties()
{
StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider();

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties);

Assert.assertTrue(s3StorageConnectorProvider instanceof S3StorageConnectorProvider);
Assert.assertTrue(s3StorageConnectorProvider.get() instanceof S3StorageConnector);
Assert.assertEquals("bucket", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getBucket());
Assert.assertEquals("prefix", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getPrefix());
Assert.assertEquals(new File("/tmp"), ((S3StorageConnectorProvider) s3StorageConnectorProvider).getTempDir());

}

@Test
public void createS3StorageFactoryWithMissingPrefix()
{

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
Assert.assertThrows(
"Missing required creator property 'prefix'",
ProvisionException.class,
() -> getStorageConnectorProvider(properties)
);
}

private StorageConnectorProvider getStorageConnectorProvider()

@Test
public void createS3StorageFactoryWithMissingBucket()
{

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
Assert.assertThrows(
"Missing required creator property 'bucket'",
ProvisionException.class,
() -> getStorageConnectorProvider(properties)
);
}

@Test
public void createS3StorageFactoryWithMissingTempDir()
{

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");

Assert.assertThrows(
"Missing required creator property 'tempDir'",
ProvisionException.class,
() -> getStorageConnectorProvider(properties)
);
}

private StorageConnectorProvider getStorageConnectorProvider(Properties properties)
{
StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add(
new AWSModule(),
new StorageConnectorModule(),
Expand All @@ -77,23 +137,36 @@ public void configure(Binder binder)
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
);

binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
.toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
.in(LazySingleton.class);
}
}
).withProperties(properties);

Injector injector = startupInjectorBuilder.build();
injector.getInstance(ObjectMapper.class).registerModules(new S3StorageConnectorModule().getJacksonModules());

injector.getInstance(ObjectMapper.class).setInjectableValues(
new InjectableValues.Std()
.addValue(ServerSideEncryptingAmazonS3.class, EasyMock.mock(ServerSideEncryptingAmazonS3.class))
.addValue(S3OutputConfig.class, new S3OutputConfig("bucket", "prefix", new File("/tmp"), new HumanReadableBytes("5MiB"), 1))
.addValue(S3UploadManager.class, EasyMock.mock(S3UploadManager.class))
.addValue(
ServerSideEncryptingAmazonS3.class,
new ServerSideEncryptingAmazonS3(null, new NoopServerSideEncryption())
)
.addValue(
S3UploadManager.class,
new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
)
);

return injector.getInstance(Key.get(

StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
));
return storageConnectorProvider;
}
}

0 comments on commit 059a5e9

Please sign in to comment.