-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle leaking TP exception in handleAssignmentChange #845
Handle leaking TP exception in handleAssignmentChange #845
Conversation
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
c9c58ab
to
e9006d2
Compare
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
...-server/src/main/java/com/linkedin/datastream/server/DummyTransportProviderAdminFactory.java
Show resolved
Hide resolved
datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
Show resolved
Hide resolved
e9006d2
to
b862840
Compare
datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
b862840
to
77a0c31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some code refactoring comments. One thought related to load on zookeeper server after this change.
} | ||
|
||
/** | ||
* Constructor for DummyTransportProviderAdminFactory which can optionally throw exception on every send call | ||
* @param throwOnSend whether or not to throw an exception on send calls | ||
*/ | ||
public DummyTransportProviderAdminFactory(boolean throwOnSend) { | ||
public DummyTransportProviderAdminFactory(boolean throwOnSend, boolean failTransportProvider) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a. change failTransportProvider
to failTransportProviderOnce
b. update javadocs.
@@ -583,6 +584,8 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx | |||
} | |||
}); | |||
|
|||
int totalTasks = currentAssignment.values().stream().mapToInt(List::size).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can extract method out of this logic.
if ((totalTasks - submittedTasks) > 0) { | ||
_log.warn("Failed to submit {} tasks from currentAssignment. Queueing onAssignmentChange event again", totalTasks - submittedTasks); | ||
// Update the metric and queue the event only once for all the tasks that failed to be initialized | ||
if (isDatastreamUpdate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Qn: Why do you need a different code logic for this v/s TimeoutException, when the expectation is the same in both the cases.
_eventQueue.put(CoordinatorEvent.createHandleDatastreamChangeEvent());
} else {
_eventQueue.put(CoordinatorEvent.createHandleAssignmentChangeEvent());
}
throw e;
``` There is no direct exception known in this path, but you can create DatastreamRuntimeException(). If you extract the highlighted code into a method and call it, it may work for you.
Logically it is the same code, but different way of achieving the same thing.
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
77b03cf
to
30e908a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some cosmetic changes and make the retry field configurable.
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Outdated
Show resolved
Hide resolved
9322769
to
5e5e49b
Compare
During handleAssignmentChange, while initializing task we were leaking a possible exception while creating TransportProvider object. To address it we catch the exception for the failed task and mark that task failed and prevent it from being added to assigned tasks and also queue another handle assignment change event. Also introducing a maximum retry count for onAssignementChange in case of errors.
5e5e49b
to
8724e19
Compare
During handleAssignmentChange, while initializing task we were leaking a possible exception while creating TransportProvider object. To address it we catch the exception for the failed task and mark that task failed and prevent it from being added to assigned tasks and also queue another handle assignment change event. Also introducing a maximum retry count for onAssignementChange in case of errors.
During handleAssignmentChange, while initializing task we were leaking a possible
exception while creating TransportProvider object. This patch fixes it by issuing
a retry on failure.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md