-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler #843
Conversation
Pull latest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nit for updating comment in the test.
.../com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java
Outdated
Show resolved
Hide resolved
* Clear the source-partition entry from the _callbackStatusMap | ||
*/ | ||
public void clear(String source, int partition) { | ||
_callbackStatusMap.remove(new SourcePartition(source, partition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for clarification. Will this result in the correct key getting removed from the map? Does SourcePartition
override getHashKey()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SourcePartition extends Pair underneath and uses the default Hash() function at Object level. SourcePartition was in use as Key in the HashMap for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
…Handler (linkedin#843) Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler
ISSUE:
Stuck partitions were reported in prod-lor1 mm.one twice in last 1 week. During both the instances, either the bad Kafka broker was abruptly removed or there were lots of URP(Under replicated partitions).
Root-cause:
a. The initial suspicion was Kafka failing to ACK() for some of the send(), resulting in stuck partitions.
b. There were logs in the files which should not be present:
2021/07/04 13:34:58.325 ERROR [FlushlessEventProducerHandler] [kafka-producer-network-thread | DatastreamLiKafkaProducer] [brooklin-service] [] Internal error: checkpoints should progress in increasing order. Resolved checkpoint as 966284416 which is less than current checkpoint of 966284432
2021/07/04 13:34:58.325 ERROR [FlushlessEventProducerHandler] [kafka-producer-network-thread | DatastreamLiKafkaProducer] [brooklin-service] [] Internal error: checkpoints should progress in increasing order. Resolved checkpoint as 966284363 which is less than current checkpoint of 966284416
This log means that the ACK received is for an offset smaller than the in-memory checkpoint in callbackStatus. This either means out of order offsets coming from kafka or the inflight message set has out-of-order messages for some reason.
So, this stuck partition issue happens when there are 2 consecutive {send() failures, seekToLastCheckpoint() } without the task thread restarting. This is the main Rootcause behind the stuck partitions seen during shutdown when there were brokers removals from Kafka side.
Solution:
To fix this: The ordering of inflight message set is extremely important in this algorithm and because the older states were not cleared, it resulted in undesired transition states.
Option 1: We will clear the callbackStatus entry for the topic Partition which are rewind to older checkpoint().
Option 2: Different data-structure that can maintain the ordering as well as remove the duplicates for inflight message set. Priority Queue can take care of ordering, but will have to be extended to avoid addition of duplicates.
For more deterministic behavior, my recommendation is Option(1).