Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding retries to update the metadata store instead of failure #15141

Merged
merged 21 commits into from
Jan 10, 2024

Conversation

Pankaj260100
Copy link
Contributor

@Pankaj260100 Pankaj260100 commented Oct 12, 2023

Fixes #15054.

Description

  • Currently, If 2 tasks are consuming from the same partitions, try to publish the segment and update the metadata, the second task can fail because the end offset stored in the metadata store doesn't match with the start offset of the second task. We can fix this by retrying instead of failing.

  • AFAIK apart from the above issue, the metadata mismatch can happen in 2 scenarios:

    1. when we update the input topic name for the data source
    2. when we run 2 replicas of ingestion tasks(1 replica will publish and 1 will fail as the first replica has already updated the metadata).
  • Implemented the comparable function to compare the last committed end offset and new Sequence start offset. And return a specific error msg for this.

  • Add retry logic on indexers to retry for this specific error msg.

  • Updated the existing test case.

This PR has:

  • been self-reviewed.
  • using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

AtomicReference<Boolean> res = new AtomicReference<>(false);
partitionSequenceNumberMap.forEach(
(partitionId, sequenceOffset) -> {
if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can assume sequenceOffsets can always be parsed as long. This seems to be a fairly broad assumption. Someone more familiar with the Kinesis supervisor should chime in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Kinesis sequence offsets are treated as opaque string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done. Two methods should be added to DataSourceMetadata

isComparable (default return false)
compare() (default return 0)

These two methods will be implemented only by the KafkaDataSourceMetadata. In that class, you can parse the offset to long.

In the IndexerSQLMetadataStorageCoordinator, you would first check isComparable before calling compare method.

How does this sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or you can have just KafkaDataSourceMetadata implement Comparable. Then you would check if the metadata is of type comparable and call compare method. Then you don't need isComparable method and the interface doesn't change either. That's actually better and easy to change in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhishekagarwal87, I have implemented comparable method only in kafkaDataSourceMetadata. And kinesisDataSourceMetadata will return 0(default). PTAL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would we handle this potential failure scenario in Kinesis then? It seems that the issue is not specific to Kafka and could happen there as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, yes. The solution here is to either retry no matter how current and new offsets compare. Or we could add the comparison to Kinesis too - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html - The sequence numbers seem to be numbers in string form but can contain up to 128 digits. It should be possible to compare them without converting them to a number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class OrderedSequenceNumber may already have the implementation to compare Kafka and Kinesis sequence numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, OrderedSequenceNumber does not have implementation to compare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pankaj260100, Don't KafkaSequenceNumber and KinesisSequenceNumber which extend this class handle the comparison at an individual sequence number's level?

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some code suggestions to describe better what I am trying to suggest

{
import java.util.Comparator;

public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements Comparable<KafkaDataSourceMetadata> {

Check warning

Code scanning / CodeQL

Inconsistent compareTo Warning

This class declares
compareTo
but inherits equals; the two could be inconsistent.
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;

if (stream.equals(otherStart.stream)) {
//Same stream, compare the offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also please add a check to compare the partitions to account for repartitioning?


/**
* Compare this and the other sequence offsets using comparator.
* Returns 1, if this sequence is ahead of the other.
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate what a sequence being ahead of the other means?

I think that it means that the partition-wise sequence number of the first is greater than or equal to the other's with a strict greater than condition for at least one of the partitions. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmatyaAvadhanula, We will only retry when the first is greater than the other's; in case both are equal, this will not fail to publish, So no point in retry, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible that when there are 10 partitions, 6 have strictly greater sequence numbers while the remaining 4 have equal sequence numbers because no new data was added to them?
I think we should retry in this case as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmatyaAvadhanula, This case is covered. We will retry when atleast one partition sequence number is strictly greater than the other. We will compare all partition offsets, and if we find one of the partition offsets greater than the other offset, we update the res variable as true. And retry in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it's possible that one partition is strictly greater but the other is strictly lower

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @AmatyaAvadhanula, as per our discussion I have added a check to verify task partitions are contained within the set in the metadata total set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reverted this: the test case where a new kafka partition gets added and wants to publish for the first time will fail as it's not in the old committed offset.


if (stream.equals(otherStart.stream)) {
//Same stream, compare the offset
AtomicReference<Boolean> res = new AtomicReference<>(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to change slightly according to https://github.com/apache/druid/pull/15141/files#r1375643913?

@AmatyaAvadhanula
Copy link
Contributor

@Pankaj260100, since #15054 is reproducible, could you please test this patch on a Druid cluster and indicate the same in the PR description as well?

@Pankaj260100
Copy link
Contributor Author

Pankaj260100 commented Nov 16, 2023

@AmatyaAvadhanula @abhishekagarwal87 @xvrl, I tried to test this patch in druid 25 in one of the dev druid clusters and whenever a task failed to update the metadata store, it started retrying. Then the retry logic executes in sequence first it completes 10 retries for 1 task then for another task and then the ingestion lag starts increasing, I was expecting it to happen in parallel. Do you have any idea why this is happening?
For ex:
Retry for 1st task starting from: 13:34:23.173 UTC ends at: 13:38:40.378 UTC
Retry for 2nd task starts at: 13:38:40.423 UTC
during this time the CPU usage went very low for Overlord as it's just retrying to update the metadata.

@abhishekagarwal87
Copy link
Contributor

@Pankaj260100 - can you elaborate a bit further? what is the 1st task and what is the 2nd task? can you use the same terms as you used in the issue you filed (#15054)

@AmatyaAvadhanula
Copy link
Contributor

It appears that retries on the Overlord happen within a transaction and that would explain the observation.
Perhaps the OverlordResource could return a status code other than 400 for certain exceptions, and the task could retry submitting the task action in such cases.

@abhishekagarwal87
Copy link
Contributor

@Pankaj260100 - did you verify this patch in any test cluster? Does it fix the problem?

@Pankaj260100
Copy link
Contributor Author

@abhishekagarwal87, Yes I tested this patch on the test druid cluster. I submitted the config twice, So there were 2 sets of tasks publishing simultaneously. From logs, I verified retries happened and there was no task failed.

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome. I just realized that I had some pending comments that I forgot to publish. So doing that now. There are also some comments from Amatya on the PR.

segmentsAndCommitMetadata.getSegments(),
"Failed publish, not removing segments"
);
Throwables.propagateIfPossible(e);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
Throwables.propagateIfPossible
should be avoided because it has been deprecated.
}

return segmentsAndCommitMetadata;
Throwables.propagateIfPossible(e);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
Throwables.propagateIfPossible
should be avoided because it has been deprecated.
@AmatyaAvadhanula
Copy link
Contributor

Thank you @Pankaj260100. LGTM!

@@ -2054,18 +2054,37 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
}

final boolean startMetadataMatchesExisting;
int startMetadataGreaterThanExisting = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a boolean instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can have a boolean. I have implemented the Comparable for compareTo() method and it returns int. so, I didn't change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comareTo() function returns +1, -1 and 0 for greaterThan, lessThan and Equal repectively.

@abhishekagarwal87 abhishekagarwal87 merged commit 047c734 into apache:master Jan 10, 2024
83 checks passed
@abhishekagarwal87
Copy link
Contributor

@kfaraz - I merged this without looking at your comment. But feel free to comment and @Pankaj260100 can hopefully address those in a follow-up PR.

@Pankaj260100
Copy link
Contributor Author

Thanks, @abhishekagarwal87, @AmatyaAvadhanula & @xvrl for the help.

But feel free to comment and @Pankaj260100 can hopefully address those in a follow-up PR.

Sure, I will do that.

@LakshSingla LakshSingla added this to the 29.0.0 milestone Jan 29, 2024
airlock-confluentinc bot pushed a commit to confluentinc/druid that referenced this pull request Sep 23, 2024
…e#15141)

Currently, If 2 tasks are consuming from the same partitions, try to publish the segment and update the metadata, the second task can fail because the end offset stored in the metadata store doesn't match with the start offset of the second task. We can fix this by retrying instead of failing.

AFAIK apart from the above issue, the metadata mismatch can happen in 2 scenarios:

- when we update the input topic name for the data source
- when we run 2 replicas of ingestion tasks(1 replica will publish and 1 will fail as the first replica has already updated the metadata).

Implemented the comparable function to compare the last committed end offset and new Sequence start offset. And return a specific error msg for this.

Add retry logic on indexers to retry for this specific error msg.

Updated the existing test case.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Failed to publish segments because of [java.lang.RuntimeException: Aborting transaction!].
6 participants