From 4ff6026d30e4da53dc0e37bc2279d9e030773787 Mon Sep 17 00:00:00 2001 From: Harini Rajendran Date: Fri, 2 Jun 2023 10:58:26 -0500 Subject: [PATCH] Adding SegmentMetadataEvent and publishing them via KafkaEmitter (#14281) In this PR, we are enhancing KafkaEmitter, to emit metadata about published segments (SegmentMetadataEvent) into a Kafka topic. This segment metadata information that gets published into Kafka, can be used by any other downstream services to query Druid intelligently based on the segments published. The segment metadata gets published into kafka topic in json string format similar to other events. --- .../extensions-contrib/kafka-emitter.md | 22 ++-- .../ambari/metrics/AmbariMetricsEmitter.java | 3 + .../emitter/dropwizard/DropwizardEmitter.java | 3 + .../emitter/graphite/GraphiteEmitter.java | 3 + .../druid/emitter/kafka/KafkaEmitter.java | 48 +++++++-- .../emitter/kafka/KafkaEmitterConfig.java | 101 ++++++++++++++--- .../emitter/kafka/KafkaEmitterConfigTest.java | 41 +++++-- .../druid/emitter/kafka/KafkaEmitterTest.java | 41 +++++-- .../SegmentTransactionalInsertAction.java | 21 ++++ .../emitter/service/SegmentMetadataEvent.java | 102 ++++++++++++++++++ .../service/SegmentMetadataEventTest.java | 54 ++++++++++ 11 files changed, 394 insertions(+), 45 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java create mode 100644 processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md index 3457c249c718..40b63ca73afd 100644 --- a/docs/development/extensions-contrib/kafka-emitter.md +++ b/docs/development/extensions-contrib/kafka-emitter.md @@ -36,20 +36,26 @@ to monitor the status of your Druid cluster with this extension. All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`. -|property|description|required?|default| -|--------|-----------|---------|-------| -|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| -|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| -|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| -|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none| -|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| -|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| +| Property | Description | Required | Default | +|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------| +| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none | +| `druid.emitter.kafka.event.types` | Comma-separated event types.
Supported types are `alerts`, `metrics`, `requests`, and `segment_metadata`. | no | `["metrics", "alerts"]` | +| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metrics. If `event.types` contains `metrics`, this field cannot be empty. | no | none | +| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alerts. If `event.types` contains `alerts`, this field cannot empty. | no | none | +| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be empty. | no | none | +| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none | +| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none | +| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none | ### Example ``` druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092 +druid.emitter.kafka.event.types=["metrics", alerts", "requests", "segment_metadata"] druid.emitter.kafka.metric.topic=druid-metric druid.emitter.kafka.alert.topic=druid-alert +druid.emitter.kafka.request.topic=druid-request-logs +druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata druid.emitter.kafka.producer.config={"max.block.ms":10000} ``` + diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java index 905b6cffc013..11dea07585db 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java +++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -137,6 +138,8 @@ public void emit(Event event) for (Emitter emitter : emitterList) { emitter.emit(event); } + } else if (event instanceof SegmentMetadataEvent) { + // do nothing. Ignore this event type } else { throw new ISE("unknown event type [%s]", event.getClass()); } diff --git a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java index 5baa1b5da245..e22c373f89fd 100644 --- a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java +++ b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.util.LinkedHashMap; @@ -127,6 +128,8 @@ public void emit(Event event) for (Emitter emitter : alertEmitters) { emitter.emit(event); } + } else if (event instanceof SegmentMetadataEvent) { + // do nothing. Ignore this event type } else { throw new ISE("unknown event type [%s]", event.getClass()); } diff --git a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java index b3739ab9d15f..10bfe1e869fc 100644 --- a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java +++ b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; @@ -139,6 +140,8 @@ public void emit(Event event) "The following alert is dropped, description is [%s], severity is [%s]", alertEvent.getDescription(), alertEvent.getSeverity() ); + } else if (event instanceof SegmentMetadataEvent) { + // do nothing. Ignore this event type } else { log.error("unknown event type [%s]", event.getClass()); } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 129a374b5849..dd8f3665f537 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType; import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; import org.apache.kafka.clients.producer.Callback; @@ -40,6 +42,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -55,6 +58,7 @@ public class KafkaEmitter implements Emitter private final AtomicLong metricLost; private final AtomicLong alertLost; private final AtomicLong requestLost; + private final AtomicLong segmentMetadataLost; private final AtomicLong invalidLost; private final KafkaEmitterConfig config; @@ -63,6 +67,7 @@ public class KafkaEmitter implements Emitter private final MemoryBoundLinkedBlockingQueue metricQueue; private final MemoryBoundLinkedBlockingQueue alertQueue; private final MemoryBoundLinkedBlockingQueue requestQueue; + private final MemoryBoundLinkedBlockingQueue segmentMetadataQueue; private final ScheduledExecutorService scheduler; protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS; @@ -78,10 +83,12 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.scheduler = Executors.newScheduledThreadPool(4); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); this.requestLost = new AtomicLong(0L); + this.segmentMetadataLost = new AtomicLong(0L); this.invalidLost = new AtomicLong(0L); } @@ -119,17 +126,25 @@ protected Producer setKafkaProducer() @Override public void start() { - scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS); - scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS); - if (config.getRequestTopic() != null) { + Set eventTypes = config.getEventTypes(); + if (eventTypes.contains(EventType.METRICS)) { + scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS); + } + if (eventTypes.contains(EventType.ALERTS)) { + scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS); + } + if (eventTypes.contains(EventType.REQUESTS)) { scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS); } + if (eventTypes.contains(EventType.SEGMENT_METADATA)) { + scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS); + } scheduler.scheduleWithFixedDelay(() -> { - log.info( - "Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]", + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]", metricLost.get(), alertLost.get(), requestLost.get(), + segmentMetadataLost.get(), invalidLost.get() ); }, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES); @@ -151,6 +166,11 @@ private void sendRequestToKafka() sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost)); } + private void sendSegmentMetadataToKafka() + { + sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, setProducerCallback(segmentMetadataLost)); + } + private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) { ObjectContainer objectToSend; @@ -183,24 +203,31 @@ public void emit(final Event event) resultJson, StringUtils.toUtf8(resultJson).length ); + + Set eventTypes = config.getEventTypes(); if (event instanceof ServiceMetricEvent) { - if (!metricQueue.offer(objectContainer)) { + if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) { metricLost.incrementAndGet(); } } else if (event instanceof AlertEvent) { - if (!alertQueue.offer(objectContainer)) { + if (!eventTypes.contains(EventType.ALERTS) || !alertQueue.offer(objectContainer)) { alertLost.incrementAndGet(); } } else if (event instanceof RequestLogEvent) { - if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) { + if (!eventTypes.contains(EventType.REQUESTS) || !requestQueue.offer(objectContainer)) { requestLost.incrementAndGet(); } + } else if (event instanceof SegmentMetadataEvent) { + if (!eventTypes.contains(EventType.SEGMENT_METADATA) || !segmentMetadataQueue.offer(objectContainer)) { + segmentMetadataLost.incrementAndGet(); + } } else { invalidLost.incrementAndGet(); } } catch (JsonProcessingException e) { invalidLost.incrementAndGet(); + log.warn(e, "Exception while serializing event"); } } } @@ -238,4 +265,9 @@ public long getInvalidLostCount() { return invalidLost.get(); } + + public long getSegmentMetadataLostCount() + { + return segmentMetadataLost.get(); + } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index ed7b9ea0e9d1..019edd095ea4 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -21,53 +21,108 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.Map; +import java.util.Set; public class KafkaEmitterConfig { + public enum EventType + { + METRICS, + ALERTS, + REQUESTS, + SEGMENT_METADATA; + + @JsonValue + @Override + public String toString() + { + return StringUtils.toLowerCase(this.name()); + } + @JsonCreator + public static EventType fromString(String name) + { + return valueOf(StringUtils.toUpperCase(name)); + } + } + + public static final Set DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS); @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) private final String bootstrapServers; - @JsonProperty("metric.topic") + @Nullable @JsonProperty("event.types") + private final Set eventTypes; + @Nullable @JsonProperty("metric.topic") private final String metricTopic; - @JsonProperty("alert.topic") + @Nullable @JsonProperty("alert.topic") private final String alertTopic; @Nullable @JsonProperty("request.topic") private final String requestTopic; + @Nullable @JsonProperty("segmentMetadata.topic") + private final String segmentMetadataTopic; @JsonProperty private final String clusterName; @JsonProperty("producer.config") - private Map kafkaProducerConfig; + private final Map kafkaProducerConfig; @JsonCreator public KafkaEmitterConfig( @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, - @JsonProperty("metric.topic") String metricTopic, - @JsonProperty("alert.topic") String alertTopic, + @Nullable @JsonProperty("event.types") Set eventTypes, + @Nullable @JsonProperty("metric.topic") String metricTopic, + @Nullable @JsonProperty("alert.topic") String alertTopic, @Nullable @JsonProperty("request.topic") String requestTopic, + @Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic, @JsonProperty("clusterName") String clusterName, @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig ) { - this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); - this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); - this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); - this.requestTopic = requestTopic; + this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null"); + this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic); + this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? Preconditions.checkNotNull(metricTopic, "druid.emitter.kafka.metric.topic can not be null") : null; + this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? Preconditions.checkNotNull(alertTopic, "druid.emitter.kafka.alert.topic can not be null") : null; + this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null; + this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; } + private Set maybeUpdateEventTypes(Set eventTypes, String requestTopic) + { + // Unless explicitly overridden, kafka emitter will always emit metrics and alerts + if (eventTypes == null) { + Set defaultEventTypes = new HashSet<>(DEFAULT_EVENT_TYPES); + // To maintain backwards compatibility, if eventTypes is not set, then requests are sent out or not + // based on the `request.topic` config + if (requestTopic != null) { + defaultEventTypes.add(EventType.REQUESTS); + } + return defaultEventTypes; + } + return eventTypes; + } + @JsonProperty public String getBootstrapServers() { return bootstrapServers; } + @JsonProperty + public Set getEventTypes() + { + return eventTypes; + } + @JsonProperty public String getMetricTopic() { @@ -92,6 +147,12 @@ public String getRequestTopic() return requestTopic; } + @Nullable + public String getSegmentMetadataTopic() + { + return segmentMetadataTopic; + } + @JsonProperty public Map getKafkaProducerConfig() { @@ -113,10 +174,16 @@ public boolean equals(Object o) if (!getBootstrapServers().equals(that.getBootstrapServers())) { return false; } - if (!getMetricTopic().equals(that.getMetricTopic())) { + + if (getEventTypes() != null ? !getEventTypes().equals(that.getEventTypes()) : that.getEventTypes() != null) { + return false; + } + + if (getMetricTopic() != null ? !getMetricTopic().equals(that.getMetricTopic()) : that.getMetricTopic() != null) { return false; } - if (!getAlertTopic().equals(that.getAlertTopic())) { + + if (getAlertTopic() != null ? !getAlertTopic().equals(that.getAlertTopic()) : that.getAlertTopic() != null) { return false; } @@ -124,6 +191,10 @@ public boolean equals(Object o) return false; } + if (getSegmentMetadataTopic() != null ? !getSegmentMetadataTopic().equals(that.getSegmentMetadataTopic()) : that.getSegmentMetadataTopic() != null) { + return false; + } + if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } @@ -134,9 +205,11 @@ public boolean equals(Object o) public int hashCode() { int result = getBootstrapServers().hashCode(); - result = 31 * result + getMetricTopic().hashCode(); - result = 31 * result + getAlertTopic().hashCode(); + result = 31 * result + (getEventTypes() != null ? getEventTypes().hashCode() : 0); + result = 31 * result + (getMetricTopic() != null ? getMetricTopic().hashCode() : 0); + result = 31 * result + (getAlertTopic() != null ? getAlertTopic().hashCode() : 0); result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); + result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); return result; @@ -147,9 +220,11 @@ public String toString() { return "KafkaEmitterConfig{" + "bootstrap.servers='" + bootstrapServers + '\'' + + ", event.types='" + eventTypes + '\'' + ", metric.topic='" + metricTopic + '\'' + ", alert.topic='" + alertTopic + '\'' + ", request.topic='" + requestTopic + '\'' + + ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' + ", clusterName='" + clusterName + '\'' + ", Producer.config=" + kafkaProducerConfig + '}'; diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 55ecdbaeb8a9..c4d5811bcb53 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -19,15 +19,18 @@ package org.apache.druid.emitter.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; - import java.io.IOException; +import java.util.HashSet; +import java.util.Set; public class KafkaEmitterConfigTest { @@ -42,8 +45,8 @@ public void setUp() @Test public void testSerDeserKafkaEmitterConfig() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", "requestTest", + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", + "alertTest", "requestTest", "metadataTest", "clusterNameTest", ImmutableMap.builder() .put("testKey", "testValue").build() ); @@ -56,8 +59,24 @@ public void testSerDeserKafkaEmitterConfig() throws IOException @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", null, + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", + "alertTest", null, "metadataTest", + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build() + ); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } + + @Test + public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException + { + Set eventTypeSet = new HashSet(); + eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA); + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null, + null, null, "metadataTest", "clusterNameTest", ImmutableMap.builder() .put("testKey", "testValue").build() ); @@ -70,8 +89,8 @@ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException @Test public void testSerDeNotRequiredKafkaProducerConfig() { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", - "alertTest", null, + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest", + "alertTest", null, "metadataTest", "clusterNameTest", null ); try { @@ -83,6 +102,14 @@ public void testSerDeNotRequiredKafkaProducerConfig() } } + @Test + public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException + { + Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class)); + Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class)); + Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class)); + } + @Test public void testJacksonModules() { diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 422d18a7f153..b40da9bd9e08 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; @@ -37,7 +38,10 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import static org.mockito.ArgumentMatchers.any; @@ -47,20 +51,23 @@ @RunWith(Parameterized.class) public class KafkaEmitterTest { - @Parameterized.Parameter + @Parameterized.Parameter(0) + public Set eventsType; + + @Parameterized.Parameter(1) public String requestTopic; - @Parameterized.Parameters(name = "{index}: requestTopic - {0}") + @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic - {1}") public static Object[] data() { - return new Object[] { - "requests", - null + return new Object[][] { + {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), "requests"}, + {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), null} }; } - // there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 5 seconds - @Test(timeout = 5_000) + // there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 10 seconds + @Test(timeout = 10_000) public void testKafkaEmitter() throws InterruptedException { final List serviceMetricEvents = ImmutableList.of( @@ -77,14 +84,26 @@ public void testKafkaEmitter() throws InterruptedException ).build("service", "host") ); - int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size(); + final List segmentMetadataEvents = ImmutableList.of( + new SegmentMetadataEvent( + "dummy_datasource", + DateTimes.of("2001-01-01T00:00:00.000Z"), + DateTimes.of("2001-01-02T00:00:00.000Z"), + DateTimes.of("2001-01-03T00:00:00.000Z"), + "dummy_version", + true + ) + ); + + int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size() + segmentMetadataEvents.size(); int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size(); final CountDownLatch countDownSentEvents = new CountDownLatch( requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); + final KafkaProducer producer = mock(KafkaProducer.class); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null), + new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null), new ObjectMapper() ) { @@ -113,10 +132,14 @@ protected Producer setKafkaProducer() for (Event event : requestLogEvents) { kafkaEmitter.emit(event); } + for (Event event : segmentMetadataEvents) { + kafkaEmitter.emit(event); + } countDownSentEvents.await(); Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); + Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount()); Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount()); Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 233739eb7b77..a0567dce04bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -33,10 +33,13 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -257,11 +260,29 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) segment.getShardSpec() == null ? null : segment.getShardSpec().getType() ); toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize())); + // Emit the segment related metadata using the configured emitters. + // There is a possibility that some segments' metadata event might get missed if the + // server crashes after commiting segment but before emitting the event. + this.emitSegmentMetadata(segment, toolbox); } return retVal; } + private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox) + { + SegmentMetadataEvent event = new SegmentMetadataEvent( + segment.getDataSource(), + DateTime.now(DateTimeZone.UTC), + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getVersion(), + segment.getLastCompactionState() != null + ); + + toolbox.getEmitter().emit(event); + } + private void checkWithSegmentLock() { final Map> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten); diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java new file mode 100644 index 000000000000..bc3769b62361 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.service; + +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.joda.time.DateTime; + +/** + * The event that gets generated whenever a segment is committed + */ +public class SegmentMetadataEvent implements Event +{ + public static final String FEED = "feed"; + public static final String DATASOURCE = "dataSource"; + public static final String CREATED_TIME = "createdTime"; + public static final String START_TIME = "startTime"; + public static final String END_TIME = "endTime"; + public static final String VERSION = "version"; + public static final String IS_COMPACTED = "isCompacted"; + + /** + * Time at which the segment metadata event is created + */ + private final DateTime createdTime; + /** + * Datasource for which the segment is committed + */ + private final String dataSource; + /** + * Start interval of the committed segment + */ + private final DateTime startTime; + /** + * End interval of the committed segment + */ + private final DateTime endTime; + /** + * Version of the committed segment + */ + private final String version; + /** + * Is the segment, a compacted segment or not + */ + private final boolean isCompacted; + + public SegmentMetadataEvent( + String dataSource, + DateTime createdTime, + DateTime startTime, + DateTime endTime, + String version, + boolean isCompacted + ) + { + this.dataSource = dataSource; + this.createdTime = createdTime; + this.startTime = startTime; + this.endTime = endTime; + this.version = version; + this.isCompacted = isCompacted; + } + + @Override + public String getFeed() + { + return "segment_metadata"; + } + @Override + @JsonValue + public EventMap toMap() + { + + return EventMap.builder() + .put(FEED, getFeed()) + .put(DATASOURCE, dataSource) + .put(CREATED_TIME, createdTime) + .put(START_TIME, startTime) + .put(END_TIME, endTime) + .put(VERSION, version) + .put(IS_COMPACTED, isCompacted) + .build(); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java new file mode 100644 index 000000000000..83a4fcba7dc5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.service; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; +import org.junit.Test; + +public class SegmentMetadataEventTest +{ + @Test + public void testBasicEvent() + { + SegmentMetadataEvent event = new SegmentMetadataEvent( + "dummy_datasource", + DateTimes.of("2001-01-01T00:00:00.000Z"), + DateTimes.of("2001-01-02T00:00:00.000Z"), + DateTimes.of("2001-01-03T00:00:00.000Z"), + "dummy_version", + true + ); + + Assert.assertEquals( + ImmutableMap.builder() + .put(SegmentMetadataEvent.FEED, "segment_metadata") + .put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource") + .put(SegmentMetadataEvent.CREATED_TIME, DateTimes.of("2001-01-01T00:00:00.000Z")) + .put(SegmentMetadataEvent.START_TIME, DateTimes.of("2001-01-02T00:00:00.000Z")) + .put(SegmentMetadataEvent.END_TIME, DateTimes.of("2001-01-03T00:00:00.000Z")) + .put(SegmentMetadataEvent.VERSION, "dummy_version") + .put(SegmentMetadataEvent.IS_COMPACTED, true) + .build(), + event.toMap() + ); + } +}