-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Stopping Logic and Maintain Stopping Latch Counter #877
Fix Stopping Logic and Maintain Stopping Latch Counter #877
Conversation
.../src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
Show resolved
Hide resolved
...java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java
Show resolved
Hide resolved
@shrinandthakkar Can you please drive this to completion? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
.../src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
Show resolved
Hide resolved
_logger.info("{} stopped", _taskName); | ||
} | ||
} | ||
|
||
protected void countDownStoppedLatch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird api to expose to subclasses. How would an implr know when it is safe to invoke this?
@@ -352,9 +352,14 @@ public void run() { | |||
LOG.info("Trying to acquire the lock on datastreamTask: {}", _datastreamTask); | |||
_datastreamTask.acquire(LOCK_ACQUIRE_TIMEOUT); | |||
} catch (DatastreamRuntimeException ex) { | |||
// setting _stoppedLatch count to 0 since the lock couldn't be acquired, | |||
// as a non-zero stoppedLatch value won't let the task to be stopped. | |||
countDownStoppedLatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception is being re-thrown anyway, so why not handle it in the super class? I don't like how tightly the super and sub classes are being coupled here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vmaheshw @surajkn @ryannedolan
Let me take this up in a separate PR but to reduce the coupling here, could we do something like this in the AbstractKafkaConnector(from where the task threads are initiated).
private ConnectorTaskEntry createKafkaConnectorTask(DatastreamTask task) {
_logger.info("Creating connector task for datastream task {}.", task);
AbstractKafkaBasedConnectorTask connectorTask = createKafkaBasedConnectorTask(task);
Thread taskThread = createTaskThread(connectorTask);
taskThread.start();
// task cleanup thread cleans up the task resources like stopped latch counts post shutdown of the task
Thread taskCleanUpThread = new Thread(() -> {
try {
taskThread.join();
} catch (InterruptedException exception) {
_logger.error(String.format("Got interrupted exception while waiting for the completion of task : %s ", connectorTask), exception);
} finally {
connectorTask.postShutdownCleanUp();
}
});
taskCleanUpThread.start();
return new ConnectorTaskEntry(connectorTask, taskThread);
}
the postShutdownCleanUp
method here would take care of counting the latch down so we don't have to handle it in run()
functions of either derived or base class implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically it will create extra thread for each thread. I think it should work, but with the constraint of double task threads.
_logger.info("{} stopped", _taskName); | ||
} | ||
} | ||
|
||
protected void countDownStoppedLatch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vmaheshw I remember you had expressed concern over this way of doing the count down. IIRC your concern was that any subclass of this can forget to call countDownStopppedLatch (although in this patch Shrinand added a call to it in KafkaMirrorMakerConnectorTask). So just wanted to check if that was considered and concluded that this is the most reasonable way of doing it?
cc @shrinandthakkar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was not in support of doing it this way, but also did not have a better idea. So, we decided to do it, until we have a better idea.
Fixing stopping logic for tasks by preventing calling stop on dead-thread tasks.
Also maintaining the stopping latch counter irrespective of success or failure in acquiring task lock otherwise it could lead to a state which causes multiple instances to work on the same task concurrently.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md