-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API Part 1 #928
Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API Part 1 #928
Conversation
eb4de71
to
ba7ba60
Compare
ee784a5
to
6622bed
Compare
(t) -> getThroughputViolatingTopics(t.getDatastreams()) : | ||
(t) -> new HashSet<>(); | ||
|
||
EventProducer producer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From @vmaheshw
Will it make sense to simplify this at Datastream level rather than DatastreamTask level?
For eg: in digest, there is only one datastream. This function will result in 10 copies for
the same datastream.
Also, hashing based on datastream name will be cheaper and more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of moving this at the Datastream level, I refactored to only maintaining a single initialization of this callback in the coordinator which is passed to each EventProducer init.
private final ReadWriteLock _throughputViolatingTopicsMapReadWriteLock = new ReentrantReadWriteLock(); | ||
private final Lock _throughputViolatingTopicsMapWriteLock = _throughputViolatingTopicsMapReadWriteLock.writeLock(); | ||
private final Lock _throughputViolatingTopicsMapReadLock = _throughputViolatingTopicsMapReadWriteLock.readLock(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From @vmaheshw
Do we need to go to low-level locks for this? Can we not rely on ConcurrentHashMap?
Violation calculation does not have to be precise in the close second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer not to use the concurrent hash map since while performing "replace-all" to this map, there may be chances that these violating topics' metrics and SLAs get reported. Hence to keep things consistent, I went with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend simplicity, especially since this is not on a critical path.
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
6622bed
to
22151e0
Compare
22151e0
to
bbb8494
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay with the READ-WRITE lock if the other approver is also fine.
Please address the valid check comment.
private static final Double ONE_MEBIBYTE = (double) (1024 * 1024); | ||
private static final Double ZNODE_BLOB_SIZE_LIMIT = ONE_MEBIBYTE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need 2 variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of these variables are little different
- ZNODE_BLOB_SIZE_LIMIT ––> Validation of data size per znode based on the znode limit (=1MB)
- ONE_MEBIBYTE --> Converting bytes to MB to get encoded data size in MBs.
Ideally, I can use the same variable, but I kept them both for better understanding purposes of two different operations happening on the same value.
@@ -127,6 +128,12 @@ public void updateDatastream(String key, Datastream datastream, boolean notifyLe | |||
throw new DatastreamException("Datastream does not exists, can not be updated: " + key); | |||
} | |||
|
|||
// As this limit is ZK specific, adding this validation check specifically in ZookeeperBackedDatastreamStore. | |||
double datastreamBlobSizeInMBs = getBlobSizeInMBs(DatastreamUtils.toJSON(datastream)); | |||
Validate.isTrue(datastreamBlobSizeInMBs <= ZNODE_BLOB_SIZE_LIMIT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation check is tricky, especially in the case of a Programmatic update. This will block the datastream update until the logic in the caller is fixed. Can you think of a less disruptive way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are your concerns @vmaheshw? I understand that we need to add similar validation on the client side as well. If a client request is breaching the ZK node size limit, it will eventually fail the update anyway, it's just the failure will happen in the ZkAdapter/ZkClient layer. This to me is a more explicit and descriptive way to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought that it can impact the datastream restart, but I was wrong.
However, in the scenario, the limit goes >1MB because of the violation list, we will not be able to disable this at the server level and will have to rely on the external service to disable this feature. Until then, any other update/allowlisting will not go through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vmaheshw understood your concern.
I have added steps in both update and create paths to only adhere to the throughputViolatingTopic metadata if the corresponding config is enabled in our server.
private final ReadWriteLock _throughputViolatingTopicsMapReadWriteLock = new ReentrantReadWriteLock(); | ||
private final Lock _throughputViolatingTopicsMapWriteLock = _throughputViolatingTopicsMapReadWriteLock.writeLock(); | ||
private final Lock _throughputViolatingTopicsMapReadLock = _throughputViolatingTopicsMapReadWriteLock.readLock(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend simplicity, especially since this is not on a critical path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments. Overall looks good.
@@ -127,6 +128,12 @@ public void updateDatastream(String key, Datastream datastream, boolean notifyLe | |||
throw new DatastreamException("Datastream does not exists, can not be updated: " + key); | |||
} | |||
|
|||
// As this limit is ZK specific, adding this validation check specifically in ZookeeperBackedDatastreamStore. | |||
double datastreamBlobSizeInMBs = getBlobSizeInMBs(DatastreamUtils.toJSON(datastream)); | |||
Validate.isTrue(datastreamBlobSizeInMBs <= ZNODE_BLOB_SIZE_LIMIT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are your concerns @vmaheshw? I understand that we need to add similar validation on the client side as well. If a client request is breaching the ZK node size limit, it will eventually fail the update anyway, it's just the failure will happen in the ZkAdapter/ZkClient layer. This to me is a more explicit and descriptive way to fail.
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
bbb8494
to
4f30281
Compare
…a Datastream Update API
4f30281
to
3c722ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going forward, please retain the commit history in your PRs. Don't rebase/squash those commits locally. Having a commit history retained in the PR helps reviewers see changes over time.
* Releasing a new version And Minor improvements * Using immutable empty set & keeping SNAPSHOT to accidently not release any version --------- Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
The EventProducer of every DatastreamTask reports SLA and latency metrics for every datastream record. But when topics (at least one partition) have higher throughput than the thresholds, it introduces latency and SLA misses in the mirroring pipeline.
This pull request is the first part of changes to handle the metrics and SLA reporting of throughput-violating topics via the datastream update API. It introduces the following changes:
Part 2 of this series would take care of:
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md