From f43964a8082d0b844b4ba55813df3d38e85613c9 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 8 Oct 2024 10:03:57 +0530 Subject: [PATCH] Fail concurrent replace tasks with finer segment granularity than append (#17265) (#17272) Co-authored-by: AmatyaAvadhanula --- .../ConcurrentReplaceAndAppendTest.java | 182 ++++++++++++++++++ .../IndexerSQLMetadataStorageCoordinator.java | 25 ++- ...exerSQLMetadataStorageCoordinatorTest.java | 67 +++++++ 3 files changed, 269 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 96c72e7130d3..c74177e2c386 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -103,6 +103,9 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase private static final Interval YEAR_23 = Intervals.of("2023/2024"); private static final Interval JAN_23 = Intervals.of("2023-01/2023-02"); private static final Interval DEC_23 = Intervals.of("2023-12/2024-01"); + private static final Interval JAN_FEB_MAR_23 = Intervals.of("2023-01-01/2023-04-01"); + private static final Interval APR_MAY_JUN_23 = Intervals.of("2023-04-01/2023-07-01"); + private static final Interval JUL_AUG_SEP_23 = Intervals.of("2023-07-01/2023-10-01"); private static final Interval OCT_NOV_DEC_23 = Intervals.of("2023-10-01/2024-01-01"); private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02"); @@ -599,6 +602,185 @@ public void testAllocateLockReplaceDayAppendMonth() verifyIntervalHasVisibleSegments(JAN_23, segmentV01); } + @Test + public void testLockReplaceQuarterAllocateAppendYear() + { + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23); + Assert.assertNotNull(replaceLock); + + final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion()); + final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion()); + final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion()); + final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion()); + + Assert.assertTrue( + replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4) + .isSuccess() + ); + verifyIntervalHasUsedSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); + Assert.assertEquals(JAN_FEB_MAR_23, pendingSegment.getInterval()); + Assert.assertEquals(replaceLock.getVersion(), pendingSegment.getVersion()); + + final DataSegment appendedSegment = asSegment(pendingSegment); + appendTask.commitAppendSegments(appendedSegment); + + verifyIntervalHasUsedSegments(YEAR_23, appendedSegment, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4); + verifyIntervalHasVisibleSegments(YEAR_23, appendedSegment, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4); + } + + @Test + public void testLockAllocateAppendYearReplaceQuarter() + { + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23); + Assert.assertNotNull(replaceLock); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); + Assert.assertEquals(YEAR_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + + final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion()); + final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion()); + final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion()); + final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion()); + + Assert.assertFalse( + replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4) + .isSuccess() + ); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + } + + @Test + public void testLockAllocateReplaceQuarterAppendYear() + { + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23); + Assert.assertNotNull(replaceLock); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); + Assert.assertEquals(YEAR_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion()); + final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion()); + final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion()); + final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion()); + + Assert.assertFalse( + replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4) + .isSuccess() + ); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + } + + @Test + public void testAllocateLockReplaceQuarterAppendYear() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); + Assert.assertEquals(YEAR_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23); + Assert.assertNotNull(replaceLock); + + final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion()); + final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion()); + final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion()); + final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion()); + + Assert.assertFalse( + replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4) + .isSuccess() + ); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + } + + @Test + public void testAllocateLockAppendYearReplaceQuarter() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); + Assert.assertEquals(YEAR_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23); + Assert.assertNotNull(replaceLock); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + + final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion()); + final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion()); + final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion()); + final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion()); + + Assert.assertFalse( + replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4) + .isSuccess() + ); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + } + + @Test + public void testAllocateAppendLockYearReplaceQuarter() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); + Assert.assertEquals(YEAR_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV01); + + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23); + Assert.assertNotNull(replaceLock); + + final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion()); + final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion()); + final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion()); + final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion()); + + Assert.assertTrue( + replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4) + .isSuccess() + ); + + verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4); + } + @Test public void testAllocateAppendMonthLockReplaceDay() { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index e914b2983359..a232d66e8a7d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -32,6 +32,7 @@ import com.google.common.io.BaseEncoding; import com.google.inject.Inject; import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -901,7 +902,15 @@ private boolean shouldUpgradePendingSegment( } else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) { return false; } else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) { - return false; + final SegmentId pendingSegmentId = pendingSegment.getId().asSegmentId(); + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.UNSUPPORTED) + .build( + "Replacing with a finer segment granularity than a concurrent append is unsupported." + + " Cannot upgrade pendingSegment[%s] to version[%s] as the replace interval[%s]" + + " does not fully contain the pendingSegment interval[%s].", + pendingSegmentId, replaceVersion, replaceInterval, pendingSegmentId.getInterval() + ); } else { // Do not upgrade already upgraded pending segment return pendingSegment.getSequenceName() == null @@ -2201,10 +2210,16 @@ private Set createNewIdsOfAppendSegmentsAfterReplace( newInterval = replaceInterval; break; } else if (replaceInterval.overlaps(oldInterval)) { - throw new ISE( - "Incompatible segment intervals for commit: [%s] and [%s].", - oldInterval, replaceInterval - ); + final String conflictingSegmentId = oldSegment.getId().toString(); + final String upgradeVersion = upgradeSegmentToLockVersion.get(conflictingSegmentId); + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.UNSUPPORTED) + .build( + "Replacing with a finer segment granularity than a concurrent append is unsupported." + + " Cannot upgrade segment[%s] to version[%s] as the replace interval[%s]" + + " does not fully contain the pending segment interval[%s].", + conflictingSegmentId, upgradeVersion, replaceInterval, oldInterval + ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index f82cfbf2a043..4b592e5f40da 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -319,6 +319,73 @@ public void testCommitAppendSegments() Assert.assertEquals(replaceLock.getVersion(), Iterables.getOnlyElement(observedLockVersions)); } + @Test + public void testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported() + { + final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01"); + final Set segmentsAppendedWithReplaceLock = new HashSet<>(); + final Map appendedSegmentToReplaceLockMap = new HashMap<>(); + final PendingSegmentRecord pendingSegmentForInterval = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "foo", + Intervals.of("2023-01-01/2024-01-01"), + "2023-01-02", + new NumberedShardSpec(100, 0) + ), + "", + "", + null, + "append" + ); + for (int i = 1; i < 9; i++) { + final DataSegment segment = new DataSegment( + "foo", + Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)), + "2023-01-0" + i, + ImmutableMap.of("path", "a-" + i), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + segmentsAppendedWithReplaceLock.add(segment); + appendedSegmentToReplaceLockMap.put(segment, replaceLock); + } + + segmentSchemaTestUtils.insertUsedSegments(segmentsAppendedWithReplaceLock, Collections.emptyMap()); + derbyConnector.retryWithHandle( + handle -> coordinator.insertPendingSegmentsIntoMetastore( + handle, + ImmutableList.of(pendingSegmentForInterval), + "foo", + true + ) + ); + insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap, derbyConnectorRule.metadataTablesConfigSupplier().get()); + + final Set replacingSegments = new HashSet<>(); + for (int i = 1; i < 9; i++) { + final DataSegment segment = new DataSegment( + "foo", + Intervals.of("2023-01-01/2023-02-01"), + "2023-02-01", + ImmutableMap.of("path", "b-" + i), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new NumberedShardSpec(i, 9), + 9, + 100 + ); + replacingSegments.add(segment); + } + + Assert.assertFalse( + coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock), null) + .isSuccess() + ); + } + @Test public void testCommitReplaceSegments() {