Skip to content

Commit

Permalink
Adding SegmentMetadataEvent and publishing them via KafkaEmitter (apa…
Browse files Browse the repository at this point in the history
…che#14281)

In this PR, we are enhancing KafkaEmitter, to emit metadata about published segments (SegmentMetadataEvent) into a Kafka topic. This segment metadata information that gets published into Kafka, can be used by any other downstream services to query Druid intelligently based on the segments published. The segment metadata gets published into kafka topic in json string format similar to other events.
  • Loading branch information
harinirajendran authored Jun 2, 2023
1 parent 45014bd commit 4ff6026
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 45 deletions.
22 changes: 14 additions & 8 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,26 @@ to monitor the status of your Druid cluster with this extension.

All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.

|property|description|required?|default|
|--------|-----------|---------|-------|
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
| Property | Description | Required | Default |
|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Supported types are `alerts`, `metrics`, `requests`, and `segment_metadata`. | no | `["metrics", "alerts"]` |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metrics. If `event.types` contains `metrics`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alerts. If `event.types` contains `alerts`, this field cannot empty. | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |

### Example

```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.event.types=["metrics", alerts", "requests", "segment_metadata"]
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
Expand Down Expand Up @@ -137,6 +138,8 @@ public void emit(Event event)
for (Emitter emitter : emitterList) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.util.LinkedHashMap;
Expand Down Expand Up @@ -127,6 +128,8 @@ public void emit(Event event)
for (Emitter emitter : alertEmitters) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;

Expand Down Expand Up @@ -139,6 +140,8 @@ public void emit(Event event)
"The following alert is dropped, description is [%s], severity is [%s]",
alertEvent.getDescription(), alertEvent.getSeverity()
);
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
log.error("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
Expand All @@ -30,6 +31,7 @@
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
Expand All @@ -40,6 +42,7 @@
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,6 +58,7 @@ public class KafkaEmitter implements Emitter
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
private final AtomicLong segmentMetadataLost;
private final AtomicLong invalidLost;

private final KafkaEmitterConfig config;
Expand All @@ -63,6 +67,7 @@ public class KafkaEmitter implements Emitter
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
private final MemoryBoundLinkedBlockingQueue<String> segmentMetadataQueue;
private final ScheduledExecutorService scheduler;

protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
Expand All @@ -78,10 +83,12 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
this.segmentMetadataLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}

Expand Down Expand Up @@ -119,17 +126,25 @@ protected Producer<String, String> setKafkaProducer()
@Override
public void start()
{
scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
if (config.getRequestTopic() != null) {
Set<EventType> eventTypes = config.getEventTypes();
if (eventTypes.contains(EventType.METRICS)) {
scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.ALERTS)) {
scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.REQUESTS)) {
scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.SEGMENT_METADATA)) {
scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
log.info(
"Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]",
metricLost.get(),
alertLost.get(),
requestLost.get(),
segmentMetadataLost.get(),
invalidLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES);
Expand All @@ -151,6 +166,11 @@ private void sendRequestToKafka()
sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
}

private void sendSegmentMetadataToKafka()
{
sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, setProducerCallback(segmentMetadataLost));
}

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
Expand Down Expand Up @@ -183,24 +203,31 @@ public void emit(final Event event)
resultJson,
StringUtils.toUtf8(resultJson).length
);

Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
if (!metricQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
if (!alertQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.ALERTS) || !alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
if (!eventTypes.contains(EventType.REQUESTS) || !requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
} else if (event instanceof SegmentMetadataEvent) {
if (!eventTypes.contains(EventType.SEGMENT_METADATA) || !segmentMetadataQueue.offer(objectContainer)) {
segmentMetadataLost.incrementAndGet();
}
} else {
invalidLost.incrementAndGet();
}
}
catch (JsonProcessingException e) {
invalidLost.incrementAndGet();
log.warn(e, "Exception while serializing event");
}
}
}
Expand Down Expand Up @@ -238,4 +265,9 @@ public long getInvalidLostCount()
{
return invalidLost.get();
}

public long getSegmentMetadataLostCount()
{
return segmentMetadataLost.get();
}
}
Loading

0 comments on commit 4ff6026

Please sign in to comment.