Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix emit wait time #2869

Merged
merged 1 commit into from
Apr 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/content/development/extensions-contrib/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ All the configuration parameters for graphite emitter are under `druid.emitter.g
|`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)|

|`druid.emitter.graphite.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0|
|`druid.emitter.graphite.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)|

### Druid to Graphite Event Converter

Graphite Event Converter defines a mapping between druid metrics name plus dimensions to a Graphite metric path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package io.druid.emitter.graphite;

import com.codahale.metrics.graphite.PickledGraphite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
Expand All @@ -30,7 +28,6 @@
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceMetricEvent;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -41,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;


Expand All @@ -52,22 +50,20 @@ public class GraphiteEmitter implements Emitter
private final GraphiteEmitterConfig graphiteEmitterConfig;
private final List<Emitter> emitterList;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ObjectMapper mapper;
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
private final static long DEFAULT_PUT_GET_TIMEOUT = 1000; // default wait for put/get operations on the queue 1 sec
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("GraphiteEmitter-%s")
.build()); // Thread pool of two in order to schedule flush runnable
private AtomicLong countLostEvents = new AtomicLong(0);

public GraphiteEmitter(
@NotNull GraphiteEmitterConfig graphiteEmitterConfig,
List<Emitter> emitterList, @NotNull ObjectMapper mapper
GraphiteEmitterConfig graphiteEmitterConfig,
List<Emitter> emitterList
)
{
this.emitterList = emitterList;
this.mapper = mapper;
this.graphiteEmitterConfig = graphiteEmitterConfig;
this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter();
this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize());
Expand Down Expand Up @@ -103,22 +99,25 @@ public void emit(Event event)
return;
}
try {
final boolean isSuccessful = eventsQueue.offer(graphiteEvent, DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS);
final boolean isSuccessful = eventsQueue.offer(
graphiteEvent,
graphiteEmitterConfig.getEmitWaitTime(),
TimeUnit.MILLISECONDS
);
if (!isSuccessful) {
log.error(
"Throwing event [%s] on the floor Graphite queue is full please increase the capacity or/and the consumer frequency",
mapper.writeValueAsString(event)
);
if (countLostEvents.getAndIncrement() % 1000 == 0) {
log.error(
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency",
countLostEvents.get()
);
}
}
}
catch (InterruptedException e) {
log.error(e, "got interrupted with message [%s]", e.getMessage());
Thread.currentThread().interrupt();

}
catch (JsonProcessingException e) {
log.error(e, e.getMessage());
}
} else if (!emitterList.isEmpty() && event instanceof AlertEvent) {
for (Emitter emitter : emitterList) {
emitter.emit(event);
Expand Down Expand Up @@ -147,7 +146,10 @@ public void run()
}
while (eventsQueue.size() > 0 && !exec.isShutdown()) {
try {
final GraphiteEvent graphiteEvent = eventsQueue.poll(DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS);
final GraphiteEvent graphiteEvent = eventsQueue.poll(
graphiteEmitterConfig.getWaitForEventTime(),
TimeUnit.MILLISECONDS
);
if (graphiteEvent != null) {
log.debug(
"sent [%s] with value [%s] and time [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class GraphiteEmitterConfig
{
private final static int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
private final static long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec

@JsonProperty
final private String hostname;
Expand All @@ -48,6 +49,13 @@ public class GraphiteEmitterConfig
@JsonProperty
final private List<String> alertEmitters;

@JsonProperty
final private Long emitWaitTime;

//waiting up to the specified wait time if necessary for an event to become available.
@JsonProperty
final private Long waitForEventTime;

@Override
public boolean equals(Object o)
{
Expand All @@ -66,40 +74,42 @@ public boolean equals(Object o)
if (getBatchSize() != that.getBatchSize()) {
return false;
}
if (getHostname() != null ? !getHostname().equals(that.getHostname()) : that.getHostname() != null) {
if (!getHostname().equals(that.getHostname())) {
return false;
}
if (!getFlushPeriod().equals(that.getFlushPeriod())) {
return false;
}
if (!getMaxQueueSize().equals(that.getMaxQueueSize())) {
return false;
}
if (getFlushPeriod() != null ? !getFlushPeriod().equals(that.getFlushPeriod()) : that.getFlushPeriod() != null) {
if (!getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())) {
return false;
}
if (getMaxQueueSize() != null
? !getMaxQueueSize().equals(that.getMaxQueueSize())
: that.getMaxQueueSize() != null) {
if (getAlertEmitters() != null
? !getAlertEmitters().equals(that.getAlertEmitters())
: that.getAlertEmitters() != null) {
return false;
}
if (getDruidToGraphiteEventConverter() != null
? !getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())
: that.getDruidToGraphiteEventConverter() != null) {
if (!getEmitWaitTime().equals(that.getEmitWaitTime())) {
return false;
}
return !(getAlertEmitters() != null
? !getAlertEmitters().equals(that.getAlertEmitters())
: that.getAlertEmitters() != null);
return getWaitForEventTime().equals(that.getWaitForEventTime());

}

@Override
public int hashCode()
{
int result = getHostname() != null ? getHostname().hashCode() : 0;
int result = getHostname().hashCode();
result = 31 * result + getPort();
result = 31 * result + getBatchSize();
result = 31 * result + (getFlushPeriod() != null ? getFlushPeriod().hashCode() : 0);
result = 31 * result + (getMaxQueueSize() != null ? getMaxQueueSize().hashCode() : 0);
result = 31 * result + (getDruidToGraphiteEventConverter() != null
? getDruidToGraphiteEventConverter().hashCode()
: 0);
result = 31 * result + getFlushPeriod().hashCode();
result = 31 * result + getMaxQueueSize().hashCode();
result = 31 * result + getDruidToGraphiteEventConverter().hashCode();
result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0);
result = 31 * result + getEmitWaitTime().hashCode();
result = 31 * result + getWaitForEventTime().hashCode();
return result;
}

Expand All @@ -111,9 +121,13 @@ public GraphiteEmitterConfig(
@JsonProperty("flushPeriod") Long flushPeriod,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter,
@JsonProperty("alertEmitters") List<String> alertEmitters
@JsonProperty("alertEmitters") List<String> alertEmitters,
@JsonProperty("emitWaitTime") Long emitWaitTime,
@JsonProperty("waitForEventTime") Long waitForEventTime
)
{
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters;
this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
druidToGraphiteEventConverter,
Expand Down Expand Up @@ -167,4 +181,16 @@ public List<String> getAlertEmitters()
{
return alertEmitters;
}

@JsonProperty
public Long getEmitWaitTime()
{
return emitWaitTime;
}

@JsonProperty
public Long getWaitForEventTime()
{
return waitForEventTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public Emitter apply(String s)
}
}
);
return new GraphiteEmitter(graphiteEmitterConfig, emitters, mapper);
return new GraphiteEmitter(graphiteEmitterConfig, emitters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public void testSerDeserGraphiteEmitterConfig() throws IOException
1000L,
100,
new SendAllGraphiteEventConverter("prefix", true, true),
Collections.EMPTY_LIST
Collections.EMPTY_LIST,
null,
null
);
String graphiteEmitterConfigString = mapper.writeValueAsString(graphiteEmitterConfig);
GraphiteEmitterConfig graphiteEmitterConfigExpected = mapper.reader(GraphiteEmitterConfig.class).readValue(
Expand Down