Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka upgrade #881

Merged
merged 7 commits into from
Feb 2, 2022
Merged

Kafka upgrade #881

merged 7 commits into from
Feb 2, 2022

Conversation

srinagaraj
Copy link
Collaborator

@srinagaraj srinagaraj commented Jan 24, 2022

The interface ConsumerRebalanceListener has changed in the 2.4 release, a new method is added onPartitionLost(), if this is not provided the default behavior is to call onPartitionRevoked(). basically on consumer close, its calling revoked partitions causing testValidateFlushlessModeTaskDiesOnRewindFailure to fail. This was not the behavior prior to this version.
Add fix to introduce a flag to hold the failure state and use it to no commit the changes on onPartitionRevoked call.

The Kafka team plans to deprecate open source li-apache-kafka-clients (http://github.com/linkedin/li-apache-kafka-clients) to streamline the client stack. Open source Brooklin currently depends on li-apache-kafka-clients and will need to directly depend on the LinkedIn version of open source Kafka clients (http://github.com/linkedin/kafka) instead before its full deprecation.
-->Auditor, and LargeMessageSegment support is deprecated.

Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, except one comment.

vmaheshw
vmaheshw previously approved these changes Jan 24, 2022
Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

build.gradle Show resolved Hide resolved
@@ -766,7 +769,7 @@ protected void updateConsumerAssignment(Collection<TopicPartition> partitions) {
public void onPartitionsRevoked(Collection<TopicPartition> topicPartitions) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaMirrorMakerConnectorTask is overriding this method. You will have to fix this logic there as well. Without that this change is incomplete.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaMirrorMakerConnectorTask is calling super.Same with onPartitionsRevoked. So I am thinking change in AbstractKafkaBasedConnectorTask should take care of it. COuld you pls confirm?

@OverRide
public void onPartitionsRevoked(Collection partitions) {
super.onPartitionsRevoked(partitions);
_topicManager.onPartitionsRevoked(partitions);
}
@OverRide
public void run() {
if (_enablePartitionAssignment) {
try {
LOG.info("Trying to acquire the lock on datastreamTask: {}", _datastreamTask);
_datastreamTask.acquire(LOCK_ACQUIRE_TIMEOUT);
} catch (DatastreamRuntimeException ex) {
LOG.error(String.format("Failed to acquire lock for datastreamTask %s", _datastreamTask), ex);
_dynamicMetricsManager.createOrUpdateMeter(generateMetricsPrefix(_connectorName, CLASS_NAME), _datastreamName,
TASK_LOCK_ACQUIRE_ERROR_RATE, 1);
throw ex;
}
}
super.run();
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the current code state, yes, it will take care of it. But, it also has capability to create bigger problems in future. By not handling in the override class and relying that the logic is present in super class, in the future if someone changed the override class and remove the super call, it will expose this problem. Since the current logic is very much dependent on how the code is written and it can change in future, it is critical to handle the scenario for easier debugging in future.

There are two ways to look at this:
a. TopicManager.onPartitionsRevoked will still be called which can add additional processing time based on what the implementation of that interface is. If it is decided that the subsequent calls should not be made, then additional checks are required in main class as well.
b. If it is not critical and we want to let the future users know about it, it is better to add a comment in KafkaMirrorMakerConnectorTask to not remove super.onPartitionsRevoked call or refactor the code to take care of the new kafka client scenario.

I am very sure this knowledge will get lost in the future if there are not enough hints in the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_topicManager.OnPartitionsRevoked is pretty much a noOp today. May be we could revisit this overridden code onPartitionsRevoked in KafkaMirrorMakerConnectorTask later if needed completly.
For now, we can add comments to explicitly state the changes in upgraded kafka even in the overridden class as mentioned in the base class for clarity.
I will add this into a separate PR following this. Thanks!

@atoomula atoomula merged commit 4d867ef into linkedin:master Feb 2, 2022
vmaheshw pushed a commit to vmaheshw/brooklin that referenced this pull request Mar 1, 2022
* Kafka upgrade

* Kafka upgrade

* Removing temp version

* Addressing comments: removing flag update after close to prevent threads misbehavior

* Removing dependency on li-apache-kafka-clients
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants