Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler #843

Merged
merged 14 commits into from
Jul 27, 2021

Conversation

vmaheshw
Copy link
Collaborator

ISSUE:
Stuck partitions were reported in prod-lor1 mm.one twice in last 1 week. During both the instances, either the bad Kafka broker was abruptly removed or there were lots of URP(Under replicated partitions).

Root-cause:

a. The initial suspicion was Kafka failing to ACK() for some of the send(), resulting in stuck partitions.
b. There were logs in the files which should not be present:
2021/07/04 13:34:58.325 ERROR [FlushlessEventProducerHandler] [kafka-producer-network-thread | DatastreamLiKafkaProducer] [brooklin-service] [] Internal error: checkpoints should progress in increasing order. Resolved checkpoint as 966284416 which is less than current checkpoint of 966284432
2021/07/04 13:34:58.325 ERROR [FlushlessEventProducerHandler] [kafka-producer-network-thread | DatastreamLiKafkaProducer] [brooklin-service] [] Internal error: checkpoints should progress in increasing order. Resolved checkpoint as 966284363 which is less than current checkpoint of 966284416

This log means that the ACK received is for an offset smaller than the in-memory checkpoint in callbackStatus. This either means out of order offsets coming from kafka or the inflight message set has out-of-order messages for some reason.

  1. What happened is: We did not clear the callbackStatus object for the topic Partition that was rewind to last checkpoint. So, it maintained all the older in-memory states and started working on top of it.

 

Action InFlight Message Set ACK Queue Checkpoint Poll() offset Notes
  {3, 5, 6, 7} {4} 3 8 Poll() offset is the offset to be polled by consumer.
Send failure for 5 {3, 5, 6, 7} {4} 3 3 Seek to last checkpoint
Send 3, 4, 5, 6, 7 {3, 5, 6, 7, 4} {4} 3 8 Add offset to inflight Queue
Ack for 3 {5, 6, 7, 4} {3,4} -> {} 5 8 Move the checkpoint to 5, since all the ACKs are smaller than the first inflight message.
Send failure for 4 {5, 6, 7, 4} {} 5 5 Seek to last checkpoint
Send 5,6,7,8 {5, 6, 7, 4, 8} {} 5 9 Add send offset to inflight queue
Ack 7 {5,6,4,8} {7} 5 9 Add 7 to ACK queue, since 7 > 5(first inflight message)
Ack 6 {5,4,8} {6,7} 5 9 Add 6 to ACK queue, since 6 > 5 (first inflight message)
Ack 5 {4,8} {5, 6, 7} 5 9 Add 5 to ACK queue, since 5 > 4 (first inflight message)
      [The checkpoint will remain stuck because 4 was never sent again]    

 

 

So, this stuck partition issue happens when there are 2  consecutive {send() failures, seekToLastCheckpoint() } without the task thread restarting. This is the main Rootcause behind the stuck partitions seen during shutdown when there were brokers removals from Kafka side.

 

Solution:

To fix this: The ordering of inflight message set is extremely important in this algorithm and because the older states were not cleared, it resulted in undesired transition states.

Option 1: We will clear the callbackStatus entry for the topic Partition which are rewind to older checkpoint().
Option 2: Different data-structure that can maintain the ordering as well as remove the duplicates for inflight message set. Priority Queue can take care of ordering, but will have to be extended to avoid addition of duplicates.

For more deterministic behavior, my recommendation is Option(1).

jzakaryan
jzakaryan previously approved these changes Jul 26, 2021
Copy link
Collaborator

@jzakaryan jzakaryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Nit for updating comment in the test.

* Clear the source-partition entry from the _callbackStatusMap
*/
public void clear(String source, int partition) {
_callbackStatusMap.remove(new SourcePartition(source, partition));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarification. Will this result in the correct key getting removed from the map? Does SourcePartition override getHashKey()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourcePartition extends Pair underneath and uses the default Hash() function at Object level. SourcePartition was in use as Key in the HashMap for a long time.

somandal
somandal previously approved these changes Jul 26, 2021
@vmaheshw vmaheshw dismissed stale reviews from somandal and jzakaryan via 85a4e21 July 26, 2021 21:27
Copy link
Collaborator

@shrinandthakkar shrinandthakkar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@vmaheshw vmaheshw merged commit 7f7c7c5 into linkedin:master Jul 27, 2021
@vmaheshw vmaheshw deleted the fixFlushlessProducer branch July 27, 2021 16:41
vmaheshw added a commit to vmaheshw/brooklin that referenced this pull request Mar 1, 2022
…Handler (linkedin#843)

Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants