Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separated Metrics Handling for Throughput Violating Topics #930

Conversation

shrinandthakkar
Copy link
Collaborator

The EventProducer of every DatastreamTask reports SLA and latency metrics for every datastream record. But when topics (at least one partition) have higher throughput than the brooklin permissible thresholds, it introduces latency and SLA misses in the mirroring pipeline.

This pull request is the second part of changes to handle the metrics and reporting of throughput-violating topics separately. It introduces the following changes:

  1. Separately reporting latency and SLA metrics for these throughput-violating topics within EventProducer.
  2. Added per datastream gauge to get insights on the frequency of these throughput violations.

Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API (Part 1 of this work) is merged and can be referenced here.

@@ -306,6 +306,9 @@ private void doUpdateDatastreams(Map<String, Datastream> datastreamMap) {
datastreamMap.get(key)
.getMetadata()
.put(DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS, StringUtils.EMPTY);
LOG.info(
"Feature handling throughput violations disabled. Flushed throughput violating topics for datastream {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does Flushed mean here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discarded.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it make more sense to use Discarded instead of Flushed

@@ -3613,6 +3613,8 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastream() throws Ex
String testCluster = "testThroughputViolatingTopicsHandlingForSingleDatastream";
String connectorType = "connectorType";
String streamName = "testThroughputViolatingTopicsHandlingForSingleDatastream";
String numThroughputViolatingTopicsMetric =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should reference the actual metric rather than redefining it.

Is this just for testing purposes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes only for testing purposes. I went this route since other tests have similar behavior as the metrics subclass is private. But instead I created a visible for testing function to retrieve the metric name.

_throughputViolatingTopicsMap.put(datastream.getName(), new HashSet<>(Arrays.asList(violatingTopics)));
}
.getOrDefault(DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS, StringUtils.EMPTY);
String[] violatingTopics = Arrays.stream(commaSeparatedViolatingTopics.split(","))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the parsing is incorrect, the message got trimmed, or is malformed?

Will it make sense to have a clean try-catch block?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function populateThroughputViolatingTopicsMap is already sitting within a try catch.
Also, I added another UT to test malformed metadata scenario, so we shouldn't need a newer try catch I think.

@@ -2462,6 +2470,13 @@ private void registerGauge(String metricName, Supplier<?> valueSupplier) {
_metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName)));
}

// registers a new gauge or updates the supplier for the gauge if it already exists
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a generic call. Why is this required? We have other instances of `Gauge and do not have this.

Based on the reasoning, we will have to do it for all the Gauge instances and move this method to generic location.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this new method since I wanted to report the change in the number of violations per datastream, hence needed a gauge for dynamic keys (datastream names).

The register gauge registers a new gauge for a new key but for any existing key, it returns the already registered gauge and hence this function will update the supplier function for an already registered gauge metric.

@@ -2462,6 +2470,13 @@ private void registerGauge(String metricName, Supplier<?> valueSupplier) {
_metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName)));
}

// registers a new gauge or updates the supplier for the gauge if it already exists
private <T> void registerOrSetGauge(String metricName, Supplier<T> valueSupplier) {
_dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, metricName, valueSupplier),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this metric get emitted for zero value as well?

This can be a concern if enabled for the Change capture cluster with many datastreams. It will make sense to have aggregate level metrics and selectively enable datastream-level metrics because X datastream means X Gauge.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be only be emitted wherever the feature of handling bad actors is enabled.
Also I updated the logic to only handle and register/update a gauge when this metadata field exists for that datastream.

Comment on lines 412 to 414
reportEventLatencyMetrics(metadata, eventsSourceTimestamp, THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING);
_dynamicMetricsManager.createOrUpdateCounter(MODULE, AGGREGATE, TOTAL_EVENTS_PRODUCED, 1);
_dynamicMetricsManager.createOrUpdateCounter(MODULE, _datastreamTask.getConnectorType(), TOTAL_EVENTS_PRODUCED,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires more precise documentation explaining X events from the throughput violation topic and Y events from the regular topic and total records X+Y.

@@ -58,6 +58,8 @@ public class EventProducer implements DatastreamEventProducer {

static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs";
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the other metrics from lines 67-70? Does the reporting need to be split for that as well?

  private static final String EVENTS_PRODUCED_WITHIN_SLA = "eventsProducedWithinSla";
  private static final String EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA = "eventsProducedWithinAlternateSla";
  

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these latency violations, we can skip this metric EVENTS_PRODUCED_WITHIN_SLA, but I updated the code to emit EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA metric as those would be applicable.

@@ -58,6 +58,8 @@ public class EventProducer implements DatastreamEventProducer {

static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs";
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need additional metric emission checks?

…handling updates based on the comments to the PR
@shrinandthakkar shrinandthakkar merged commit 892b740 into linkedin:master May 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants