-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding retries to update the metadata store instead of failure #15141
Conversation
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
Fixed
Show fixed
Hide fixed
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
Fixed
Show fixed
Hide fixed
...c/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
Fixed
Show fixed
Hide fixed
...c/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
Fixed
Show fixed
Hide fixed
AtomicReference<Boolean> res = new AtomicReference<>(false); | ||
partitionSequenceNumberMap.forEach( | ||
(partitionId, sequenceOffset) -> { | ||
if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can assume sequenceOffsets can always be parsed as long. This seems to be a fairly broad assumption. Someone more familiar with the Kinesis supervisor should chime in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Kinesis sequence offsets are treated as opaque string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be done. Two methods should be added to DataSourceMetadata
isComparable (default return false)
compare() (default return 0)
These two methods will be implemented only by the KafkaDataSourceMetadata. In that class, you can parse the offset to long.
In the IndexerSQLMetadataStorageCoordinator, you would first check isComparable before calling compare method.
How does this sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or you can have just KafkaDataSourceMetadata implement Comparable. Then you would check if the metadata is of type comparable and call compare method. Then you don't need isComparable
method and the interface doesn't change either. That's actually better and easy to change in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekagarwal87, I have implemented comparable method only in kafkaDataSourceMetadata. And kinesisDataSourceMetadata will return 0(default). PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would we handle this potential failure scenario in Kinesis then? It seems that the issue is not specific to Kafka and could happen there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could, yes. The solution here is to either retry no matter how current and new offsets compare. Or we could add the comparison to Kinesis too - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html - The sequence numbers seem to be numbers in string form but can contain up to 128 digits. It should be possible to compare them without converting them to a number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class OrderedSequenceNumber
may already have the implementation to compare Kafka and Kinesis sequence numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, OrderedSequenceNumber does not have implementation to compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Pankaj260100, Don't KafkaSequenceNumber
and KinesisSequenceNumber
which extend this class handle the comparison at an individual sequence number's level?
...ce/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some code suggestions to describe better what I am trying to suggest
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
Outdated
Show resolved
Hide resolved
...-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
{ | ||
import java.util.Comparator; | ||
|
||
public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements Comparable<KafkaDataSourceMetadata> { |
Check warning
Code scanning / CodeQL
Inconsistent compareTo Warning
compareTo
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other; | ||
|
||
if (stream.equals(otherStart.stream)) { | ||
//Same stream, compare the offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also please add a check to compare the partitions to account for repartitioning?
|
||
/** | ||
* Compare this and the other sequence offsets using comparator. | ||
* Returns 1, if this sequence is ahead of the other. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate what a sequence being ahead of the other means?
I think that it means that the partition-wise sequence number of the first is greater than or equal to the other's with a strict greater than condition for at least one of the partitions. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AmatyaAvadhanula, We will only retry when the first is greater than the other's; in case both are equal, this will not fail to publish, So no point in retry, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it possible that when there are 10 partitions, 6 have strictly greater sequence numbers while the remaining 4 have equal sequence numbers because no new data was added to them?
I think we should retry in this case as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AmatyaAvadhanula, This case is covered. We will retry when atleast one partition sequence number is strictly greater than the other. We will compare all partition offsets, and if we find one of the partition offsets greater than the other offset, we update the res variable as true. And retry in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if it's possible that one partition is strictly greater but the other is strictly lower
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @AmatyaAvadhanula, as per our discussion I have added a check to verify task partitions are contained within the set in the metadata total set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted this: the test case where a new kafka partition gets added and wants to publish for the first time will fail as it's not in the old committed offset.
|
||
if (stream.equals(otherStart.stream)) { | ||
//Same stream, compare the offset | ||
AtomicReference<Boolean> res = new AtomicReference<>(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to change slightly according to https://github.com/apache/druid/pull/15141/files#r1375643913?
@Pankaj260100, since #15054 is reproducible, could you please test this patch on a Druid cluster and indicate the same in the PR description as well? |
@AmatyaAvadhanula @abhishekagarwal87 @xvrl, I tried to test this patch in druid 25 in one of the dev druid clusters and whenever a task failed to update the metadata store, it started retrying. Then the retry logic executes in sequence first it completes 10 retries for 1 task then for another task and then the ingestion lag starts increasing, I was expecting it to happen in parallel. Do you have any idea why this is happening? |
@Pankaj260100 - can you elaborate a bit further? what is the 1st task and what is the 2nd task? can you use the same terms as you used in the issue you filed (#15054) |
It appears that retries on the Overlord happen within a transaction and that would explain the observation. |
@Pankaj260100 - did you verify this patch in any test cluster? Does it fix the problem? |
@abhishekagarwal87, Yes I tested this patch on the test druid cluster. I submitted the config twice, So there were 2 sets of tasks publishing simultaneously. From logs, I verified retries happened and there was no task failed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome. I just realized that I had some pending comments that I forgot to publish. So doing that now. There are also some comments from Amatya on the PR.
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
Outdated
Show resolved
Hide resolved
...-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
Fixed
Show fixed
Hide fixed
...src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
Outdated
Show resolved
Hide resolved
segmentsAndCommitMetadata.getSegments(), | ||
"Failed publish, not removing segments" | ||
); | ||
Throwables.propagateIfPossible(e); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
Throwables.propagateIfPossible
} | ||
|
||
return segmentsAndCommitMetadata; | ||
Throwables.propagateIfPossible(e); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
Throwables.propagateIfPossible
Thank you @Pankaj260100. LGTM! |
@@ -2054,18 +2054,37 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( | |||
} | |||
|
|||
final boolean startMetadataMatchesExisting; | |||
int startMetadataGreaterThanExisting = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a boolean instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can have a boolean. I have implemented the Comparable for compareTo() method and it returns int. so, I didn't change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comareTo()
function returns +1, -1 and 0 for greaterThan, lessThan and Equal repectively.
@kfaraz - I merged this without looking at your comment. But feel free to comment and @Pankaj260100 can hopefully address those in a follow-up PR. |
Thanks, @abhishekagarwal87, @AmatyaAvadhanula & @xvrl for the help.
Sure, I will do that. |
…e#15141) Currently, If 2 tasks are consuming from the same partitions, try to publish the segment and update the metadata, the second task can fail because the end offset stored in the metadata store doesn't match with the start offset of the second task. We can fix this by retrying instead of failing. AFAIK apart from the above issue, the metadata mismatch can happen in 2 scenarios: - when we update the input topic name for the data source - when we run 2 replicas of ingestion tasks(1 replica will publish and 1 will fail as the first replica has already updated the metadata). Implemented the comparable function to compare the last committed end offset and new Sequence start offset. And return a specific error msg for this. Add retry logic on indexers to retry for this specific error msg. Updated the existing test case.
Fixes #15054.
Description
Currently, If 2 tasks are consuming from the same partitions, try to publish the segment and update the metadata, the second task can fail because the end offset stored in the metadata store doesn't match with the start offset of the second task. We can fix this by retrying instead of failing.
AFAIK apart from the above issue, the metadata mismatch can happen in 2 scenarios:
Implemented the comparable function to compare the last committed end offset and new Sequence start offset. And return a specific error msg for this.
Add retry logic on indexers to retry for this specific error msg.
Updated the existing test case.
This PR has: