-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add https to druid-influxdb-emitter extension #9938
Changes from 2 commits
c015f76
81260f3
bd00a00
9214edd
43f1554
b79350c
3bc316e
219e596
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,11 +28,18 @@ | |
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; | ||
import org.apache.http.client.HttpClient; | ||
import org.apache.http.client.methods.HttpPost; | ||
import org.apache.http.conn.ssl.NoopHostnameVerifier; | ||
import org.apache.http.entity.ContentType; | ||
import org.apache.http.entity.StringEntity; | ||
import org.apache.http.impl.client.HttpClientBuilder; | ||
import org.apache.http.impl.client.HttpClients; | ||
|
||
import javax.net.ssl.SSLContext; | ||
import javax.net.ssl.TrustManagerFactory; | ||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.IOException; | ||
import java.security.KeyStore; | ||
import java.util.Arrays; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
|
@@ -56,7 +63,7 @@ public class InfluxdbEmitter implements Emitter | |
public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig) | ||
{ | ||
this.influxdbEmitterConfig = influxdbEmitterConfig; | ||
this.influxdbClient = HttpClientBuilder.create().build(); | ||
this.influxdbClient = buildInfluxdbClient(); | ||
this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize()); | ||
this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist(); | ||
log.info("constructed influxdb emitter"); | ||
|
@@ -96,7 +103,8 @@ public void emit(Event event) | |
public void postToInflux(String payload) | ||
{ | ||
HttpPost post = new HttpPost( | ||
"http://" + influxdbEmitterConfig.getHostname() | ||
influxdbEmitterConfig.getProtocol() + "://" | ||
+ influxdbEmitterConfig.getHostname() | ||
+ ":" + influxdbEmitterConfig.getPort() | ||
+ "/write?db=" + influxdbEmitterConfig.getDatabaseName() | ||
+ "&u=" + influxdbEmitterConfig.getInfluxdbUserName() | ||
|
@@ -211,4 +219,46 @@ public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> e | |
postToInflux(payload.toString()); | ||
} | ||
|
||
private HttpClient buildInfluxdbClient() | ||
{ | ||
if ("https".equals(influxdbEmitterConfig.getProtocol())) { | ||
SSLContext sslContext = null; | ||
if (influxdbEmitterConfig.getTrustStorePath() == null || influxdbEmitterConfig.getTrustStorePassword() == null) { | ||
String msg = "Can't load TrustStore. Truststore path or password is not set."; | ||
log.error(msg); | ||
throw new IllegalStateException(msg); | ||
} | ||
|
||
FileInputStream in = null; | ||
|
||
try { | ||
in = new FileInputStream(new File(influxdbEmitterConfig.getTrustStorePath())); | ||
KeyStore store = KeyStore.getInstance(influxdbEmitterConfig.getTrustStoreType() == null | ||
? KeyStore.getDefaultType() | ||
: influxdbEmitterConfig.getTrustStoreType()); | ||
store.load(in, influxdbEmitterConfig.getTrustStorePassword().toCharArray()); | ||
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); | ||
tmf.init(store); | ||
sslContext = SSLContext.getInstance("TLS"); | ||
sslContext.init(null, tmf.getTrustManagers(), null); | ||
} | ||
catch (Exception ex) { | ||
log.error("Unable to load TrustStore"); | ||
} | ||
finally { | ||
if (in != null) { | ||
try { | ||
in.close(); | ||
} | ||
catch (IOException ex) { | ||
log.error("Unable to load TrustStore"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be thrown. |
||
} | ||
} | ||
} | ||
return HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build(); | ||
} else { | ||
return HttpClientBuilder.create().build(); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,12 +36,21 @@ public class InfluxdbEmitterConfig | |
private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE; | ||
private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds | ||
private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier"); | ||
private static final String DEFAULT_PROTOCOL = "http"; | ||
|
||
@JsonProperty | ||
private final String hostname; | ||
@JsonProperty | ||
private final Integer port; | ||
@JsonProperty | ||
private final String protocol; | ||
@JsonProperty | ||
private final String trustStorePath; | ||
@JsonProperty | ||
private final String trustStoreType; | ||
@JsonProperty | ||
private final String trustStorePassword; | ||
@JsonProperty | ||
private final String databaseName; | ||
@JsonProperty | ||
private final Integer maxQueueSize; | ||
|
@@ -62,6 +71,10 @@ public class InfluxdbEmitterConfig | |
public InfluxdbEmitterConfig( | ||
@JsonProperty("hostname") String hostname, | ||
@JsonProperty("port") Integer port, | ||
@JsonProperty("protocol") String protocol, | ||
@JsonProperty("trustStorePath") String trustStorePath, | ||
@JsonProperty("trustStoreType") String trustStoreType, | ||
@JsonProperty("trustStorePassword") String trustStorePassword, | ||
@JsonProperty("databaseName") String databaseName, | ||
@JsonProperty("maxQueueSize") Integer maxQueueSize, | ||
@JsonProperty("flushPeriod") Integer flushPeriod, | ||
|
@@ -73,6 +86,10 @@ public InfluxdbEmitterConfig( | |
{ | ||
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null"); | ||
this.port = port == null ? DEFAULT_PORT : port; | ||
this.protocol = protocol == null ? DEFAULT_PROTOCOL : protocol; | ||
this.trustStorePath = trustStorePath; | ||
this.trustStoreType = trustStoreType; | ||
this.trustStorePassword = trustStorePassword; | ||
this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null"); | ||
this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize; | ||
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod; | ||
|
@@ -121,6 +138,18 @@ public boolean equals(Object o) | |
if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) { | ||
return false; | ||
} | ||
if (!getProtocol().equals(that.getProtocol())) { | ||
return false; | ||
} | ||
if (getTrustStorePath() != null ? !getTrustStorePath().equals(that.getTrustStorePath()) : that.getTrustStorePath() != null) { | ||
return false; | ||
} | ||
if (getTrustStoreType() != null ? !getTrustStoreType().equals(that.getTrustStoreType()) : that.getTrustStoreType() != null) { | ||
return false; | ||
} | ||
if (getTrustStorePassword() != null ? !getTrustStorePassword().equals(that.getTrustStorePassword()) : that.getTrustStorePassword() != null) { | ||
return false; | ||
} | ||
return true; | ||
|
||
} | ||
|
@@ -130,6 +159,10 @@ public int hashCode() | |
{ | ||
int result = getHostname().hashCode(); | ||
result = 31 * result + getPort(); | ||
result = 31 * result + getProtocol().hashCode(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason not to use
|
||
result = 31 * result + (getTrustStorePath() != null ? getTrustStorePath().hashCode() : 0); | ||
result = 31 * result + (getTrustStoreType() != null ? getTrustStoreType().hashCode() : 0); | ||
result = 31 * result + (getTrustStorePassword() != null ? getTrustStorePassword().hashCode() : 0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like CI is complaining about a lack of test coverage. You can add an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does
but if port is null then it gets set to a default value in the config's constructor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EqualsVerifier doesn't know that because that field is not marked as You could also tell the test that the field is not null with |
||
result = 31 * result + getDatabaseName().hashCode(); | ||
result = 31 * result + getFlushPeriod(); | ||
result = 31 * result + getMaxQueueSize(); | ||
|
@@ -152,6 +185,30 @@ public int getPort() | |
return port; | ||
} | ||
|
||
@JsonProperty | ||
public String getProtocol() | ||
{ | ||
return protocol; | ||
} | ||
|
||
@JsonProperty | ||
public String getTrustStorePath() | ||
{ | ||
return trustStorePath; | ||
} | ||
|
||
@JsonProperty | ||
public String getTrustStoreType() | ||
{ | ||
return trustStoreType; | ||
} | ||
|
||
@JsonProperty | ||
public String getTrustStorePassword() | ||
{ | ||
return trustStorePassword; | ||
} | ||
|
||
@JsonProperty | ||
public String getDatabaseName() | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use
try-with-resources
. You will not have to handle the IOException thrown inclose()
by yourself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review - I think I've addressed these comments. Can you take another look please?