Skip to content

Commit

Permalink
Fix the upsert metadata bug when adding segment with same comparison …
Browse files Browse the repository at this point in the history
…value
  • Loading branch information
Jackie-Jiang committed Apr 25, 2022
1 parent d6001a2 commit 8c5e48e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIter
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
IndexSegment currentSegment = currentRecordLocation.getSegment();
int comparisonResult = recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue());

// The current record is in the same segment
// Update the record location when there is a tie to keep the newer record. Note that the record info
// iterator will return records with incremental doc ids.
IndexSegment currentSegment = currentRecordLocation.getSegment();
if (segment == currentSegment) {
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
if (comparisonResult >= 0) {
validDocIds.replace(currentRecordLocation.getDocId(), recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
Expand All @@ -124,7 +125,7 @@ public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIter
// doc ids for the old segment because it has not been replaced yet.
String currentSegmentName = currentSegment.getSegmentName();
if (segmentName.equals(currentSegmentName)) {
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
if (comparisonResult >= 0) {
validDocIds.add(recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
Expand All @@ -136,12 +137,10 @@ public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIter
// Update the record location when getting a newer comparison value, or the value is the same as the
// current value, but the segment has a larger sequence number (the segment is newer than the current
// segment).
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) > 0 || (
recordInfo._comparisonValue == currentRecordLocation.getComparisonValue()
&& LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
&& LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
&& LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
currentSegmentName))) {
if (comparisonResult > 0 || (comparisonResult == 0 && LLCSegmentName.isLowLevelConsumerSegmentName(
segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
&& LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
currentSegmentName))) {
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ private void verifyAddSegment(UpsertConfig.HashFunction hashFunction) {

// Add the first segment
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 80));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, 120));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(100)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, new IntWrapper(80)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, new IntWrapper(120)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
Expand All @@ -77,11 +77,11 @@ private void verifyAddSegment(UpsertConfig.HashFunction hashFunction) {

// Add the second segment
ArrayList<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new ArrayList<>();
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 120));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, 80));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, 80));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(120)));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, new IntWrapper(80)));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, new IntWrapper(80)));
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
Expand Down Expand Up @@ -142,13 +142,13 @@ private static PrimaryKey getPrimaryKey(int value) {
}

private static void checkRecordLocation(Map<Object, RecordLocation> recordLocationMap, int keyValue,
IndexSegment segment, int docId, int timestamp, UpsertConfig.HashFunction hashFunction) {
IndexSegment segment, int docId, int comparisonValue, UpsertConfig.HashFunction hashFunction) {
RecordLocation recordLocation =
recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(keyValue), hashFunction));
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
assertEquals(recordLocation.getComparisonValue(), timestamp);
assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
}

@Test
Expand All @@ -166,9 +166,9 @@ private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 120));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(120)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
Expand All @@ -177,7 +177,8 @@ private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
IndexSegment segment2 = mockSegment(1, validDocIds2);

upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100));
upsertMetadataManager.addRecord(segment2,
new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, new IntWrapper(100)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
Expand All @@ -187,7 +188,8 @@ private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});

upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120));
upsertMetadataManager.addRecord(segment2,
new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, new IntWrapper(120)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
Expand All @@ -197,7 +199,8 @@ private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});

upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100));
upsertMetadataManager.addRecord(segment2,
new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, new IntWrapper(100)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
Expand All @@ -207,7 +210,8 @@ private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});

upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100));
upsertMetadataManager.addRecord(segment2,
new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, new IntWrapper(100)));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
Expand All @@ -234,14 +238,14 @@ private void verifyRemoveSegment(UpsertConfig.HashFunction hashFunction) {
// segment1: 0 -> {0, 100}, 1 -> {1, 100}
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new ArrayList<>();
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, 100));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, 100));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, new IntWrapper(100)));
recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
Expand Down Expand Up @@ -274,4 +278,37 @@ public void testHashPrimaryKey() {
((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MURMUR3)).getBytes()),
"8d68b314cc0c8de4dbd55f4dad3c3e66");
}

/**
* Use a wrapper class to ensure different value has different reference.
*/
private static class IntWrapper implements Comparable<IntWrapper> {
final int _value;

IntWrapper(int value) {
_value = value;
}

@Override
public int compareTo(IntWrapper o) {
return Integer.compare(_value, o._value);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof IntWrapper)) {
return false;
}
IntWrapper that = (IntWrapper) o;
return _value == that._value;
}

@Override
public int hashCode() {
return _value;
}
}
}

0 comments on commit 8c5e48e

Please sign in to comment.