diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 307bfb0508d9..107dc4b9f97a 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1773,32 +1773,31 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) th @Override public void deleteSegments(final Set segments) { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) - { - int segmentSize = segments.size(); - String dataSource = ""; - for (final DataSegment segment : segments) { - dataSource = segment.getDataSource(); - deleteSegment(handle, segment); - } - log.debugSegments(segments, "Delete the metadata of segments"); - log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); + if (segments.isEmpty()) { + log.info("No segments to delete."); + return; + } - return null; + final String deleteSql = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); + final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); + + // generate the IDs outside the transaction block + final List ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList()); + + int numDeletedSegments = connector.getDBI().inTransaction((handle, transactionStatus) -> { + final PreparedBatch batch = handle.prepareBatch(deleteSql); + + for (final String id : ids) { + batch.bind("id", id).add(); } + + int[] deletedRows = batch.execute(); + return Arrays.stream(deletedRows).sum(); } ); - } - private void deleteSegment(final Handle handle, final DataSegment segment) - { - handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable())) - .bind("id", segment.getId().toString()) - .execute(); + log.debugSegments(segments, "Delete the metadata of segments"); + log.info("Deleted [%d] segments from metadata storage for dataSource [%s].", numDeletedSegments, dataSource); } private void updatePayload(final Handle handle, final DataSegment segment) throws IOException diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index ed5423a97470..74e06bfb6645 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -146,6 +146,18 @@ public class IndexerSQLMetadataStorageCoordinatorTest 100 ); + private final DataSegment defaultSegment2WithBiggerSize = new DataSegment( + "fooDataSource", + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(1), + 9, + 200 + ); + private final DataSegment defaultSegment3 = new DataSegment( "fooDataSource", Intervals.of("2015-01-03T00Z/2015-01-04T00Z"), @@ -1413,6 +1425,46 @@ public void testDeleteSegmentsInMetaDataStorage() throws IOException ); } + @Test + public void testUpdateSegmentsInMetaDataStorage() throws IOException + { + // Published segments to MetaDataStorage + coordinator.announceHistoricalSegments(SEGMENTS); + + // check segments Published + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + Segments.ONLY_VISIBLE + ) + ) + ); + + // update single metadata item + coordinator.updateSegmentMetadata(Collections.singleton(defaultSegment2WithBiggerSize)); + + Collection updated = coordinator.retrieveUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + Segments.ONLY_VISIBLE); + + Assert.assertEquals(SEGMENTS.size(), updated.size()); + + DataSegment defaultAfterUpdate = updated.stream().filter(s -> s.equals(defaultSegment)).findFirst().get(); + DataSegment default2AfterUpdate = updated.stream().filter(s -> s.equals(defaultSegment2)).findFirst().get(); + + Assert.assertNotNull(defaultAfterUpdate); + Assert.assertNotNull(default2AfterUpdate); + + // check that default did not change + Assert.assertEquals(defaultSegment.getSize(), defaultAfterUpdate.getSize()); + // but that default 2 did change + Assert.assertEquals(defaultSegment2WithBiggerSize.getSize(), default2AfterUpdate.getSize()); + } + @Test public void testSingleAdditionalNumberedShardWithNoCorePartitions() throws IOException {