Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add https to druid-influxdb-emitter extension #9938

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/development/extensions-contrib/influxdb-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ All the configuration parameters for the influxdb emitter are under `druid.emitt
|--------|-----------|---------|-------|
|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
|`druid.emitter.influxdb.protocol`|The protocol used to send metrics to InfluxDB. One of http/https|No|http|
|`druid.emitter.influxdb.trustStorePath`|The path to the trustStore to be used for https|No|none|
|`druid.emitter.influxdb.trustStoreType`|The trustStore type to be used for https|No|`jks`|
|`druid.emitter.influxdb.trustStorePassword`|The trustStore password to be used for https|No|none|
|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.MAX_VALUE(=2^31-1)|
|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
Expand Down
5 changes: 5 additions & 0 deletions extensions-contrib/influxdb-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,10 @@
<artifactId>JUnitParams</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -56,7 +63,7 @@ public class InfluxdbEmitter implements Emitter
public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
{
this.influxdbEmitterConfig = influxdbEmitterConfig;
this.influxdbClient = HttpClientBuilder.create().build();
this.influxdbClient = buildInfluxdbClient();
this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
log.info("constructed influxdb emitter");
Expand Down Expand Up @@ -96,7 +103,8 @@ public void emit(Event event)
public void postToInflux(String payload)
{
HttpPost post = new HttpPost(
"http://" + influxdbEmitterConfig.getHostname()
influxdbEmitterConfig.getProtocol() + "://"
+ influxdbEmitterConfig.getHostname()
+ ":" + influxdbEmitterConfig.getPort()
+ "/write?db=" + influxdbEmitterConfig.getDatabaseName()
+ "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
Expand Down Expand Up @@ -211,4 +219,33 @@ public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> e
postToInflux(payload.toString());
}

private HttpClient buildInfluxdbClient()
{
if ("https".equals(influxdbEmitterConfig.getProtocol())) {
SSLContext sslContext;
if (influxdbEmitterConfig.getTrustStorePath() == null || influxdbEmitterConfig.getTrustStorePassword() == null) {
String msg = "Can't load TrustStore. Truststore path or password is not set.";
log.error(msg);
throw new IllegalStateException(msg);
}

try (FileInputStream in = new FileInputStream(new File(influxdbEmitterConfig.getTrustStorePath()))) {
KeyStore store = KeyStore.getInstance(influxdbEmitterConfig.getTrustStoreType());
store.load(in, influxdbEmitterConfig.getTrustStorePassword().toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(store);
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, tmf.getTrustManagers(), null);
}
catch (Exception ex) {
String msg = "Unable to load TrustStore";
log.error(msg);
throw new IllegalStateException(msg);
}
return HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build();
} else {
return HttpClientBuilder.create().build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.logger.Logger;

import java.security.KeyStore;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;

public class InfluxdbEmitterConfig
Expand All @@ -36,12 +37,21 @@ public class InfluxdbEmitterConfig
private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
private static final String DEFAULT_PROTOCOL = "http";

@JsonProperty
private final String hostname;
@JsonProperty
private final Integer port;
@JsonProperty
private final String protocol;
@JsonProperty
private final String trustStorePath;
@JsonProperty
private final String trustStoreType;
@JsonProperty
private final String trustStorePassword;
@JsonProperty
private final String databaseName;
@JsonProperty
private final Integer maxQueueSize;
Expand All @@ -56,12 +66,15 @@ public class InfluxdbEmitterConfig
@JsonProperty
private final ImmutableSet<String> dimensionWhitelist;

private static Logger log = new Logger(InfluxdbEmitterConfig.class);

@JsonCreator
public InfluxdbEmitterConfig(
@JsonProperty("hostname") String hostname,
@JsonProperty("port") Integer port,
@JsonProperty("protocol") String protocol,
@JsonProperty("trustStorePath") String trustStorePath,
@JsonProperty("trustStoreType") String trustStoreType,
@JsonProperty("trustStorePassword") String trustStorePassword,
@JsonProperty("databaseName") String databaseName,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("flushPeriod") Integer flushPeriod,
Expand All @@ -73,6 +86,10 @@ public InfluxdbEmitterConfig(
{
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = port == null ? DEFAULT_PORT : port;
this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol;
this.trustStorePath = trustStorePath;
this.trustStoreType = trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType;
this.trustStorePassword = trustStorePassword;
this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
Expand All @@ -88,7 +105,7 @@ public boolean equals(Object o)
if (this == o) {
return true;
}
if (!(o instanceof InfluxdbEmitterConfig)) {
if (o == null || getClass() != o.getClass()) {
return false;
}

Expand Down Expand Up @@ -121,23 +138,28 @@ public boolean equals(Object o)
if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
return false;
}
if (!getProtocol().equals(that.getProtocol())) {
return false;
}
if (getTrustStorePath() != null ? !getTrustStorePath().equals(that.getTrustStorePath()) : that.getTrustStorePath() != null) {
return false;
}
if (!getTrustStoreType().equals(that.getTrustStoreType())) {
return false;
}
if (getTrustStorePassword() != null ? !getTrustStorePassword().equals(that.getTrustStorePassword()) : that.getTrustStorePassword() != null) {
return false;
}
return true;

}

@Override
public int hashCode()
{
int result = getHostname().hashCode();
result = 31 * result + getPort();
result = 31 * result + getDatabaseName().hashCode();
result = 31 * result + getFlushPeriod();
result = 31 * result + getMaxQueueSize();
result = 31 * result + getFlushDelay();
result = 31 * result + getInfluxdbUserName().hashCode();
result = 31 * result + getInfluxdbPassword().hashCode();
result = 31 * result + getDimensionWhitelist().hashCode();
return result;
return Objects.hash(hostname, port, protocol, trustStorePath, trustStoreType,
trustStorePassword, databaseName, flushPeriod, maxQueueSize,
flushDelay, influxdbUserName, influxdbPassword, dimensionWhitelist);
}

@JsonProperty
Expand All @@ -152,6 +174,30 @@ public int getPort()
return port;
}

@JsonProperty
public String getProtocol()
{
return protocol;
}

@JsonProperty
public String getTrustStorePath()
{
return trustStorePath;
}

@JsonProperty
public String getTrustStoreType()
{
return trustStoreType;
}

@JsonProperty
public String getTrustStorePassword()
{
return trustStorePassword;
}

@JsonProperty
public String getDatabaseName()
{
Expand Down
Loading