-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix rebalance on upsert table #12054
Fix rebalance on upsert table #12054
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #12054 +/- ##
============================================
+ Coverage 61.52% 61.65% +0.12%
- Complexity 1152 1153 +1
============================================
Files 2386 2386
Lines 129565 129579 +14
Branches 20053 20056 +3
============================================
+ Hits 79721 79898 +177
+ Misses 44026 43868 -158
+ Partials 5818 5813 -5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L); | ||
// Check ZK metadata for uploaded segments to look for a segment that's in the same partition | ||
for (String uploadedSegment : uploadedSegments) { | ||
if (getPartitionId(uploadedSegment) == partitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment that there is no need to cache here, as assignSegment() is just one-time shot, but rebalance() can be called in a loop for multiple times
/** | ||
* Returns the partition id of the given segment, using cached partition id if exists. | ||
*/ | ||
private int getPartitionIdUsingCache(String segmentName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leave a TODO about the check on whether segment's partition id is updated when uploading segment to replace the existing one, so we could catch such rare case during table rebalance, because if that case ever happens, the cache is out of sync from the segment ZK metadata
#11628 introduced the feature to assign segments for upsert table based on the existing segment assignment. It can potentially cause new segment mis-assigned during rebalance when rebalancer tries to reuse the target assignment.
This PR fixes the issue by always re-calculate target assignment for
StrictRealtimeSegmentAssignment
, and enhancesStrictRealtimeSegmentAssignment
to cache the segment partition id to reduce the cost of rebalance.