-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separated Metrics Handling for Throughput Violating Topics #930
Separated Metrics Handling for Throughput Violating Topics #930
Conversation
@@ -306,6 +306,9 @@ private void doUpdateDatastreams(Map<String, Datastream> datastreamMap) { | |||
datastreamMap.get(key) | |||
.getMetadata() | |||
.put(DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS, StringUtils.EMPTY); | |||
LOG.info( | |||
"Feature handling throughput violations disabled. Flushed throughput violating topics for datastream {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does Flushed
mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discarded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it make more sense to use Discarded
instead of Flushed
@@ -3613,6 +3613,8 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex | |||
String testCluster = "testThroughputViolatingTopicsHandlingForSingleDatastream"; | |||
String connectorType = "connectorType"; | |||
String streamName = "testThroughputViolatingTopicsHandlingForSingleDatastream"; | |||
String numThroughputViolatingTopicsMetric = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should reference the actual metric rather than redefining it.
Is this just for testing purposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes only for testing purposes. I went this route since other tests have similar behavior as the metrics subclass is private. But instead I created a visible for testing function to retrieve the metric name.
_throughputViolatingTopicsMap.put(datastream.getName(), new HashSet<>(Arrays.asList(violatingTopics))); | ||
} | ||
.getOrDefault(DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS, StringUtils.EMPTY); | ||
String[] violatingTopics = Arrays.stream(commaSeparatedViolatingTopics.split(",")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the parsing is incorrect, the message got trimmed, or is malformed?
Will it make sense to have a clean try-catch
block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function populateThroughputViolatingTopicsMap is already sitting within a try catch.
Also, I added another UT to test malformed metadata scenario, so we shouldn't need a newer try catch I think.
@@ -2462,6 +2470,13 @@ private void registerGauge(String metricName, Supplier<?> valueSupplier) { | |||
_metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName))); | |||
} | |||
|
|||
// registers a new gauge or updates the supplier for the gauge if it already exists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a generic call. Why is this required? We have other instances of `Gauge and do not have this.
Based on the reasoning, we will have to do it for all the Gauge
instances and move this method to generic location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this new method since I wanted to report the change in the number of violations per datastream, hence needed a gauge for dynamic keys (datastream names).
The register gauge registers a new gauge for a new key but for any existing key, it returns the already registered gauge and hence this function will update the supplier function for an already registered gauge metric.
@@ -2462,6 +2470,13 @@ private void registerGauge(String metricName, Supplier<?> valueSupplier) { | |||
_metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName))); | |||
} | |||
|
|||
// registers a new gauge or updates the supplier for the gauge if it already exists | |||
private <T> void registerOrSetGauge(String metricName, Supplier<T> valueSupplier) { | |||
_dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, metricName, valueSupplier), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this metric get emitted for zero value as well?
This can be a concern if enabled for the Change capture cluster with many datastreams. It will make sense to have aggregate
level metrics and selectively enable datastream-level
metrics because X datastream means X Gauge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be only be emitted wherever the feature of handling bad actors is enabled.
Also I updated the logic to only handle and register/update a gauge when this metadata field exists for that datastream.
reportEventLatencyMetrics(metadata, eventsSourceTimestamp, THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING); | ||
_dynamicMetricsManager.createOrUpdateCounter(MODULE, AGGREGATE, TOTAL_EVENTS_PRODUCED, 1); | ||
_dynamicMetricsManager.createOrUpdateCounter(MODULE, _datastreamTask.getConnectorType(), TOTAL_EVENTS_PRODUCED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires more precise documentation explaining X events from the throughput violation topic and Y events from the regular topic and total records X+Y.
@@ -58,6 +58,8 @@ public class EventProducer implements DatastreamEventProducer { | |||
|
|||
static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs"; | |||
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs"; | |||
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs"; | |||
static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the other metrics from lines 67-70? Does the reporting need to be split for that as well?
private static final String EVENTS_PRODUCED_WITHIN_SLA = "eventsProducedWithinSla";
private static final String EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA = "eventsProducedWithinAlternateSla";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For these latency violations, we can skip this metric EVENTS_PRODUCED_WITHIN_SLA, but I updated the code to emit EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA metric as those would be applicable.
@@ -58,6 +58,8 @@ public class EventProducer implements DatastreamEventProducer { | |||
|
|||
static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs"; | |||
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs"; | |||
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need additional metric emission checks?
…handling updates based on the comments to the PR
The EventProducer of every DatastreamTask reports SLA and latency metrics for every datastream record. But when topics (at least one partition) have higher throughput than the brooklin permissible thresholds, it introduces latency and SLA misses in the mirroring pipeline.
This pull request is the second part of changes to handle the metrics and reporting of throughput-violating topics separately. It introduces the following changes:
Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API (Part 1 of this work) is merged and can be referenced here.