diff --git a/docs/development/extensions-contrib/influxdb-emitter.md b/docs/development/extensions-contrib/influxdb-emitter.md index 3b1c84c30ca0..076b8fc70b8d 100644 --- a/docs/development/extensions-contrib/influxdb-emitter.md +++ b/docs/development/extensions-contrib/influxdb-emitter.md @@ -41,6 +41,10 @@ All the configuration parameters for the influxdb emitter are under `druid.emitt |--------|-----------|---------|-------| |`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A| |`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086| +|`druid.emitter.influxdb.protocol`|The protocol used to send metrics to InfluxDB. One of http/https|No|http| +|`druid.emitter.influxdb.trustStorePath`|The path to the trustStore to be used for https|No|none| +|`druid.emitter.influxdb.trustStoreType`|The trustStore type to be used for https|No|`jks`| +|`druid.emitter.influxdb.trustStorePassword`|The trustStore password to be used for https|No|none| |`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A| |`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.MAX_VALUE(=2^31-1)| |`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000| diff --git a/extensions-contrib/influxdb-emitter/pom.xml b/extensions-contrib/influxdb-emitter/pom.xml index ee85f0d44c4a..75a1cb6997bd 100644 --- a/extensions-contrib/influxdb-emitter/pom.xml +++ b/extensions-contrib/influxdb-emitter/pom.xml @@ -93,5 +93,10 @@ JUnitParams test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java index ee22917f8e3f..bb1c72b03141 100644 --- a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java +++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java @@ -28,11 +28,18 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.security.KeyStore; import java.util.Arrays; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -56,7 +63,7 @@ public class InfluxdbEmitter implements Emitter public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig) { this.influxdbEmitterConfig = influxdbEmitterConfig; - this.influxdbClient = HttpClientBuilder.create().build(); + this.influxdbClient = buildInfluxdbClient(); this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize()); this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist(); log.info("constructed influxdb emitter"); @@ -96,7 +103,8 @@ public void emit(Event event) public void postToInflux(String payload) { HttpPost post = new HttpPost( - "http://" + influxdbEmitterConfig.getHostname() + influxdbEmitterConfig.getProtocol() + "://" + + influxdbEmitterConfig.getHostname() + ":" + influxdbEmitterConfig.getPort() + "/write?db=" + influxdbEmitterConfig.getDatabaseName() + "&u=" + influxdbEmitterConfig.getInfluxdbUserName() @@ -211,4 +219,33 @@ public void transformAndSendToInfluxdb(LinkedBlockingQueue e postToInflux(payload.toString()); } + private HttpClient buildInfluxdbClient() + { + if ("https".equals(influxdbEmitterConfig.getProtocol())) { + SSLContext sslContext; + if (influxdbEmitterConfig.getTrustStorePath() == null || influxdbEmitterConfig.getTrustStorePassword() == null) { + String msg = "Can't load TrustStore. Truststore path or password is not set."; + log.error(msg); + throw new IllegalStateException(msg); + } + + try (FileInputStream in = new FileInputStream(new File(influxdbEmitterConfig.getTrustStorePath()))) { + KeyStore store = KeyStore.getInstance(influxdbEmitterConfig.getTrustStoreType()); + store.load(in, influxdbEmitterConfig.getTrustStorePassword().toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(store); + sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + } + catch (Exception ex) { + String msg = "Unable to load TrustStore"; + log.error(msg); + throw new IllegalStateException(msg); + } + return HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build(); + } else { + return HttpClientBuilder.create().build(); + } + } + } diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java index d96b07083e2e..87b1a6922ce7 100644 --- a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java +++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java @@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import org.apache.druid.java.util.common.logger.Logger; +import java.security.KeyStore; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Set; public class InfluxdbEmitterConfig @@ -36,12 +37,21 @@ public class InfluxdbEmitterConfig private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE; private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds private static final List DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier"); + private static final String DEFAULT_PROTOCOL = "http"; @JsonProperty private final String hostname; @JsonProperty private final Integer port; @JsonProperty + private final String protocol; + @JsonProperty + private final String trustStorePath; + @JsonProperty + private final String trustStoreType; + @JsonProperty + private final String trustStorePassword; + @JsonProperty private final String databaseName; @JsonProperty private final Integer maxQueueSize; @@ -56,12 +66,15 @@ public class InfluxdbEmitterConfig @JsonProperty private final ImmutableSet dimensionWhitelist; - private static Logger log = new Logger(InfluxdbEmitterConfig.class); @JsonCreator public InfluxdbEmitterConfig( @JsonProperty("hostname") String hostname, @JsonProperty("port") Integer port, + @JsonProperty("protocol") String protocol, + @JsonProperty("trustStorePath") String trustStorePath, + @JsonProperty("trustStoreType") String trustStoreType, + @JsonProperty("trustStorePassword") String trustStorePassword, @JsonProperty("databaseName") String databaseName, @JsonProperty("maxQueueSize") Integer maxQueueSize, @JsonProperty("flushPeriod") Integer flushPeriod, @@ -73,6 +86,10 @@ public InfluxdbEmitterConfig( { this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null"); this.port = port == null ? DEFAULT_PORT : port; + this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol; + this.trustStorePath = trustStorePath; + this.trustStoreType = trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType; + this.trustStorePassword = trustStorePassword; this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null"); this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize; this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod; @@ -88,7 +105,7 @@ public boolean equals(Object o) if (this == o) { return true; } - if (!(o instanceof InfluxdbEmitterConfig)) { + if (o == null || getClass() != o.getClass()) { return false; } @@ -121,6 +138,18 @@ public boolean equals(Object o) if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) { return false; } + if (!getProtocol().equals(that.getProtocol())) { + return false; + } + if (getTrustStorePath() != null ? !getTrustStorePath().equals(that.getTrustStorePath()) : that.getTrustStorePath() != null) { + return false; + } + if (!getTrustStoreType().equals(that.getTrustStoreType())) { + return false; + } + if (getTrustStorePassword() != null ? !getTrustStorePassword().equals(that.getTrustStorePassword()) : that.getTrustStorePassword() != null) { + return false; + } return true; } @@ -128,16 +157,9 @@ public boolean equals(Object o) @Override public int hashCode() { - int result = getHostname().hashCode(); - result = 31 * result + getPort(); - result = 31 * result + getDatabaseName().hashCode(); - result = 31 * result + getFlushPeriod(); - result = 31 * result + getMaxQueueSize(); - result = 31 * result + getFlushDelay(); - result = 31 * result + getInfluxdbUserName().hashCode(); - result = 31 * result + getInfluxdbPassword().hashCode(); - result = 31 * result + getDimensionWhitelist().hashCode(); - return result; + return Objects.hash(hostname, port, protocol, trustStorePath, trustStoreType, + trustStorePassword, databaseName, flushPeriod, maxQueueSize, + flushDelay, influxdbUserName, influxdbPassword, dimensionWhitelist); } @JsonProperty @@ -152,6 +174,30 @@ public int getPort() return port; } + @JsonProperty + public String getProtocol() + { + return protocol; + } + + @JsonProperty + public String getTrustStorePath() + { + return trustStorePath; + } + + @JsonProperty + public String getTrustStoreType() + { + return trustStoreType; + } + + @JsonProperty + public String getTrustStorePassword() + { + return trustStorePassword; + } + @JsonProperty public String getDatabaseName() { diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java index 09e6e55c2f38..d6312115a0ed 100644 --- a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java +++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java @@ -22,11 +22,13 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.security.KeyStore; import java.util.Arrays; public class InfluxdbEmitterConfigTest @@ -45,6 +47,10 @@ public void setUp() influxdbEmitterConfig = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -61,6 +67,10 @@ public void testInfluxdbEmitterConfigObjectsAreDifferent() InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig( "localhost", 8080, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -78,6 +88,10 @@ public void testConfigWithNullHostname() InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig( null, 8080, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -94,6 +108,10 @@ public void testConfigWithNullPort() InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig( "localhost", null, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -112,6 +130,10 @@ public void testEqualsMethod() InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -129,6 +151,10 @@ public void testEqualsMethodWithNotEqualConfigs() InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig( "localhost", 8086, + "https", + "/path", + "jks", + "password", "dbname", 10000, 15000, @@ -146,6 +172,10 @@ public void testConfigWithNullInfluxdbUserName() InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -162,6 +192,10 @@ public void testConfigWithNullInfluxdbPassword() InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -178,6 +212,10 @@ public void testConfigWithNullDimensionWhitelist() InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -196,6 +234,10 @@ public void testConfigWithDimensionWhitelist() InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -208,4 +250,58 @@ public void testConfigWithDimensionWhitelist() Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist()); } + @Test + public void testConfigWithNullProtocol() + { + InfluxdbEmitterConfig influxdbEmitterConfigWithNullProtocol = new InfluxdbEmitterConfig( + "localhost", + 8086, + null, + "path", + "jks", + "pass", + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + String expectedProtocol = "http"; + Assert.assertEquals(expectedProtocol, influxdbEmitterConfigWithNullProtocol.getProtocol()); + } + + @Test + public void testConfigEquals() + { + EqualsVerifier.forClass(InfluxdbEmitterConfig.class).withNonnullFields( + "hostname", "port", "protocol", "trustStoreType", "databaseName", + "maxQueueSize", "flushPeriod", "flushDelay", "influxdbUserName", + "influxdbPassword", "dimensionWhitelist" + ).usingGetClass().verify(); + } + + @Test + public void testConfigWithNullTrustStoreType() + { + InfluxdbEmitterConfig influxdbEmitterConfigWithNullTrustStoreType = new InfluxdbEmitterConfig( + "localhost", + 8086, + null, + "path", + null, + "pass", + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + String expectedTrustStoreType = KeyStore.getDefaultType(); + Assert.assertEquals(expectedTrustStoreType, influxdbEmitterConfigWithNullTrustStoreType.getTrustStoreType()); + } + } diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java index 318f38f65c33..f39549250f47 100644 --- a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java +++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java @@ -66,6 +66,10 @@ public void testTransformForInfluxWithLongMetric() InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -107,6 +111,10 @@ public void testTransformForInfluxWithShortMetric() InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -149,6 +157,10 @@ public void testMetricIsInDimensionWhitelist() InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -191,6 +203,10 @@ public void testMetricIsInDefaultDimensionWhitelist() InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( "localhost", 8086, + null, + null, + null, + null, "dbname", 10000, 15000, @@ -211,4 +227,89 @@ public void testJacksonModules() { Assert.assertTrue(new InfluxdbEmitterModule().getJacksonModules().isEmpty()); } + + @Test(expected = IllegalStateException.class) + public void testBuildInfluxdbClientWithHttpsProtocolAndNoTrustStore() + { + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "https", + null, + null, + null, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + } + + @Test(expected = IllegalStateException.class) + public void testBuildInfluxdbClientWithHttpsProtocolAndNullTrustStorePath() + { + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "https", + null, + null, + "pass", + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + } + + @Test(expected = IllegalStateException.class) + public void testBuildInfluxdbClientWithHttpsProtocolAndNullTrustStorePassword() + { + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "https", + "path", + null, + null, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + } + + @Test(expected = IllegalStateException.class) + public void testUnableToLoadTrustStore() + { + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "https", + "path", + null, + "pass", + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + } + }