-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dropOutOfOrderRecord config to drop out-of-order events #11811
Conversation
Codecov Report
@@ Coverage Diff @@
## master #11811 +/- ##
=============================================
- Coverage 63.16% 50.27% -12.89%
+ Complexity 1141 6 -1135
=============================================
Files 2343 2267 -76
Lines 126420 122690 -3730
Branches 19442 18886 -556
=============================================
- Hits 79849 61686 -18163
- Misses 40900 56878 +15978
+ Partials 5671 4126 -1545
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 497 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
...al/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
Show resolved
Hide resolved
.../src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
Outdated
Show resolved
Hide resolved
canTakeMore = numDocsIndexed++ < _capacity; | ||
_partitionUpsertMetadataManager.addRecord(this, recordInfo); | ||
// if record doesn't need to be dropped, then persist in segment and update metadata hashmap | ||
if (!_partitionUpsertMetadataManager.shouldDropRecord(recordInfo)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is already performed in doAddRecord()
. Suggest modifying addRecord()
to return boolean
which indicates whether the record should be kept. Then we may modify this part into
if (!_partitionUpsertMetadataManager.shouldDropRecord(recordInfo)) { | |
if (_partitionUpsertMetadataManager.addRecord(this, recordInfo)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Jackie-Jiang at present _partitionUpsertMetadataManager.addRecord
is called after addNewRow
. Do you foresee any issues in changing the order here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One scenario I can think of where this might cause an issue is that we add a record in the metadata hashmap but somehow addNewRow
fails due to some indexing issues. At present, it throws an exception which doesn't call the _partitionUpsertMetadataManager.addRecord
as well which makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked and there should be no issue changing the order because the row is not added to the metadata manager if it is out of order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked and there should be no issue changing the order because the row is not added to the metadata manager if it is out of order
What about a row which is not out-of-order? It gets added to metadata manager but addNewRow
call fails due to some issue. Will that not cause issues when further updates are received for that record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it throws exception, in both cases _numDocsIndexed
won't be updated, and we will end up re-indexing the same docId. That needs to be solved separately, but very good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm i think there can be a scenario something like:
Key: A, Current Segment: S1, Valid Doc ID: 2
and sayaddNewRow
call fails for this.
MetadataMap: A -> RecordLocation (S1, 2)Key: B, Current Segment: S1, Valid Doc ID: 2
(This also gets valid Doc ID as 2 as_numDocsIndexed
didn't update).
MetadataMap: A -> RecordLocation (S1, 2), B -> RecordLocation (S1, 2)- Now say, we receive a record for key A again after sometime when we are processing segment S2.
In that scenario, we will invoke a removeDocID command for (S1, 2) here -Line 264 in 51335a2
removeDocId(currentSegment, currentDocId);
which will invalidate docID for B and will stop responding it in queries.
Did I miss anything? 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The analysis is correct. What I meant is that currently it is anyway not handled correctly, and we should solve this problem in a separate PR. Assuming addNewRow()
failed, dictionary was already updated, and the first several columns might also be added already. Currently it works because that call never fails. The proper fix should be to ensure even if it throws exception, we should continue on other columns (e.g. put a default value for the failed column), and increase the doc id anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me! Updated the patch based on this.
Hey @Jackie-Jiang updated the patch based on suggestions. Can you review once? |
labels:
feature
upsert
release-notes
Related to issue: #11796
This patch adds a new config for upsert:
dropOutOfOrderRecord
which when set to true, doesn't persist out-of-order events in the segment. This saves disk-usage and also avoids any confusion when usingskipUpsert
.