Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add https to druid-influxdb-emitter extension #9938

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/development/extensions-contrib/influxdb-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ All the configuration parameters for the influxdb emitter are under `druid.emitt
|--------|-----------|---------|-------|
|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
|`druid.emitter.influxdb.protocol`|The protocol used to send metrics to InfluxDB. One of http/https|No|http|
|`druid.emitter.influxdb.trustStorePath`|The path to the trustStore to be used for https|No|none|
|`druid.emitter.influxdb.trustStoreType`|The trustStore type to be used for https|No|`jks`|
|`druid.emitter.influxdb.trustStorePassword`|The trustStore password to be used for https|No|none|
|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.MAX_VALUE(=2^31-1)|
|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -56,7 +63,7 @@ public class InfluxdbEmitter implements Emitter
public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
{
this.influxdbEmitterConfig = influxdbEmitterConfig;
this.influxdbClient = HttpClientBuilder.create().build();
this.influxdbClient = buildInfluxdbClient();
this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
log.info("constructed influxdb emitter");
Expand Down Expand Up @@ -96,7 +103,8 @@ public void emit(Event event)
public void postToInflux(String payload)
{
HttpPost post = new HttpPost(
"http://" + influxdbEmitterConfig.getHostname()
influxdbEmitterConfig.getProtocol() + "://"
+ influxdbEmitterConfig.getHostname()
+ ":" + influxdbEmitterConfig.getPort()
+ "/write?db=" + influxdbEmitterConfig.getDatabaseName()
+ "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
Expand Down Expand Up @@ -211,4 +219,46 @@ public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> e
postToInflux(payload.toString());
}

private HttpClient buildInfluxdbClient()
{
if ("https".equals(influxdbEmitterConfig.getProtocol())) {
SSLContext sslContext = null;
if (influxdbEmitterConfig.getTrustStorePath() == null || influxdbEmitterConfig.getTrustStorePassword() == null) {
String msg = "Can't load TrustStore. Truststore path or password is not set.";
log.error(msg);
throw new IllegalStateException(msg);
}

FileInputStream in = null;

try {
in = new FileInputStream(new File(influxdbEmitterConfig.getTrustStorePath()));
KeyStore store = KeyStore.getInstance(influxdbEmitterConfig.getTrustStoreType() == null
? KeyStore.getDefaultType()
: influxdbEmitterConfig.getTrustStoreType());
store.load(in, influxdbEmitterConfig.getTrustStorePassword().toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(store);
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, tmf.getTrustManagers(), null);
}
catch (Exception ex) {
log.error("Unable to load TrustStore");
}
finally {
if (in != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use try-with-resources. You will not have to handle the IOException thrown in close() by yourself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review - I think I've addressed these comments. Can you take another look please?

try {
in.close();
}
catch (IOException ex) {
log.error("Unable to load TrustStore");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be thrown.

}
}
}
return HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build();
} else {
return HttpClientBuilder.create().build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,21 @@ public class InfluxdbEmitterConfig
private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
private static final String DEFAULT_PROTOCOL = "http";

@JsonProperty
private final String hostname;
@JsonProperty
private final Integer port;
@JsonProperty
private final String protocol;
@JsonProperty
private final String trustStorePath;
@JsonProperty
private final String trustStoreType;
@JsonProperty
private final String trustStorePassword;
@JsonProperty
private final String databaseName;
@JsonProperty
private final Integer maxQueueSize;
Expand All @@ -62,6 +71,10 @@ public class InfluxdbEmitterConfig
public InfluxdbEmitterConfig(
@JsonProperty("hostname") String hostname,
@JsonProperty("port") Integer port,
@JsonProperty("protocol") String protocol,
@JsonProperty("trustStorePath") String trustStorePath,
@JsonProperty("trustStoreType") String trustStoreType,
@JsonProperty("trustStorePassword") String trustStorePassword,
@JsonProperty("databaseName") String databaseName,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("flushPeriod") Integer flushPeriod,
Expand All @@ -73,6 +86,10 @@ public InfluxdbEmitterConfig(
{
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = port == null ? DEFAULT_PORT : port;
this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol;
this.trustStorePath = trustStorePath;
this.trustStoreType = trustStoreType;
this.trustStorePassword = trustStorePassword;
this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
Expand Down Expand Up @@ -121,6 +138,18 @@ public boolean equals(Object o)
if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
return false;
}
if (!getProtocol().equals(that.getProtocol())) {
return false;
}
if (getTrustStorePath() != null ? !getTrustStorePath().equals(that.getTrustStorePath()) : that.getTrustStorePath() != null) {
return false;
}
if (getTrustStoreType() != null ? !getTrustStoreType().equals(that.getTrustStoreType()) : that.getTrustStoreType() != null) {
return false;
}
if (getTrustStorePassword() != null ? !getTrustStorePassword().equals(that.getTrustStorePassword()) : that.getTrustStorePassword() != null) {
return false;
}
return true;

}
Expand All @@ -130,6 +159,10 @@ public int hashCode()
{
int result = getHostname().hashCode();
result = 31 * result + getPort();
result = 31 * result + getProtocol().hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to use

    return Objects.hash(
return Objects.hash(
        hostname,
        port,
...

result = 31 * result + (getTrustStorePath() != null ? getTrustStorePath().hashCode() : 0);
result = 31 * result + (getTrustStoreType() != null ? getTrustStoreType().hashCode() : 0);
result = 31 * result + (getTrustStorePassword() != null ? getTrustStorePassword().hashCode() : 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like CI is complaining about a lack of test coverage. You can add an EqualsVerifier test to fix that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does EqualsVerifier work? The test I added is failing on

Non-nullity: equals throws NullPointerException on field port.

but if port is null then it gets set to a default value in the config's constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EqualsVerifier doesn't know that because that field is not marked as NotNull. There's more information about that exception here - https://jqno.nl/equalsverifier/errormessages/non-nullity-equals-hashcode-tostring-throws-nullpointerexception/

You could also tell the test that the field is not null with .withNonnullFields("port") in the EqualsVerifier test

result = 31 * result + getDatabaseName().hashCode();
result = 31 * result + getFlushPeriod();
result = 31 * result + getMaxQueueSize();
Expand All @@ -152,6 +185,30 @@ public int getPort()
return port;
}

@JsonProperty
public String getProtocol()
{
return protocol;
}

@JsonProperty
public String getTrustStorePath()
{
return trustStorePath;
}

@JsonProperty
public String getTrustStoreType()
{
return trustStoreType;
}

@JsonProperty
public String getTrustStorePassword()
{
return trustStorePassword;
}

@JsonProperty
public String getDatabaseName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public void setUp()
influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -61,6 +65,10 @@ public void testInfluxdbEmitterConfigObjectsAreDifferent()
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8080,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -78,6 +86,10 @@ public void testConfigWithNullHostname()
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
null,
8080,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -94,6 +106,10 @@ public void testConfigWithNullPort()
InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig(
"localhost",
null,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -112,6 +128,10 @@ public void testEqualsMethod()
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -129,6 +149,10 @@ public void testEqualsMethodWithNotEqualConfigs()
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8086,
"https",
"/path",
"jks",
"password",
"dbname",
10000,
15000,
Expand All @@ -146,6 +170,10 @@ public void testConfigWithNullInfluxdbUserName()
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -162,6 +190,10 @@ public void testConfigWithNullInfluxdbPassword()
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -178,6 +210,10 @@ public void testConfigWithNullDimensionWhitelist()
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -196,6 +232,10 @@ public void testConfigWithDimensionWhitelist()
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
null,
null,
null,
"dbname",
10000,
15000,
Expand All @@ -208,4 +248,26 @@ public void testConfigWithDimensionWhitelist()
Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
}

@Test
public void testConfigWithNullProtocol()
{
InfluxdbEmitterConfig influxdbEmitterConfigWithNullProtocol = new InfluxdbEmitterConfig(
"localhost",
8086,
null,
"path",
"jks",
"pass",
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
String expectedProtocol = "http";
Assert.assertEquals(expectedProtocol, influxdbEmitterConfig.getProtocol());
}

}
Loading