From d57c891e017e5b569237040f1551d409d7df33d0 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Fri, 21 Jul 2023 10:31:12 -0700 Subject: [PATCH 1/4] (perf) server: batch SQL Metadata deleteSegments and updateSegmentMetadata --- .../IndexerSQLMetadataStorageCoordinator.java | 52 +++++++++---------- ...exerSQLMetadataStorageCoordinatorTest.java | 52 +++++++++++++++++++ 2 files changed, 76 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 307bfb0508d9..186fb8938acf 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -70,6 +70,7 @@ import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -1760,8 +1761,22 @@ public void updateSegmentMetadata(final Set segments) @Override public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception { + final String updatePayload = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()); + final PreparedBatch batch = handle.prepareBatch(updatePayload); + for (final DataSegment segment : segments) { - updatePayload(handle, segment); + String id = segment.getId().toString(); + byte[] payload = jsonMapper.writeValueAsBytes(segment); + + batch.bind("id", id).bind("payload", payload).add(); + } + + try { + batch.execute(); + } + catch (DBIException e) { + log.error(e, "Exception inserting into DB"); + throw e; } return null; @@ -1780,11 +1795,16 @@ public void deleteSegments(final Set segments) public Void inTransaction(Handle handle, TransactionStatus transactionStatus) { int segmentSize = segments.size(); - String dataSource = ""; + String dataSource = segments.stream().findAny().map(DataSegment::getDataSource).orElse("?"); + + final String deleteFrom = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); + final PreparedBatch batch = handle.prepareBatch(deleteFrom); + for (final DataSegment segment : segments) { - dataSource = segment.getDataSource(); - deleteSegment(handle, segment); + batch.bind("id", segment.getId().toString()).add(); } + batch.execute(); + log.debugSegments(segments, "Delete the metadata of segments"); log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); @@ -1794,30 +1814,6 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) ); } - private void deleteSegment(final Handle handle, final DataSegment segment) - { - handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable())) - .bind("id", segment.getId().toString()) - .execute(); - } - - private void updatePayload(final Handle handle, final DataSegment segment) throws IOException - { - try { - handle - .createStatement( - StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) - ) - .bind("id", segment.getId().toString()) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .execute(); - } - catch (IOException e) { - log.error(e, "Exception inserting into DB"); - throw e; - } - } - @Override public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) { diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index ed5423a97470..74e06bfb6645 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -146,6 +146,18 @@ public class IndexerSQLMetadataStorageCoordinatorTest 100 ); + private final DataSegment defaultSegment2WithBiggerSize = new DataSegment( + "fooDataSource", + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(1), + 9, + 200 + ); + private final DataSegment defaultSegment3 = new DataSegment( "fooDataSource", Intervals.of("2015-01-03T00Z/2015-01-04T00Z"), @@ -1413,6 +1425,46 @@ public void testDeleteSegmentsInMetaDataStorage() throws IOException ); } + @Test + public void testUpdateSegmentsInMetaDataStorage() throws IOException + { + // Published segments to MetaDataStorage + coordinator.announceHistoricalSegments(SEGMENTS); + + // check segments Published + Assert.assertEquals( + SEGMENTS, + ImmutableSet.copyOf( + coordinator.retrieveUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + Segments.ONLY_VISIBLE + ) + ) + ); + + // update single metadata item + coordinator.updateSegmentMetadata(Collections.singleton(defaultSegment2WithBiggerSize)); + + Collection updated = coordinator.retrieveUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + Segments.ONLY_VISIBLE); + + Assert.assertEquals(SEGMENTS.size(), updated.size()); + + DataSegment defaultAfterUpdate = updated.stream().filter(s -> s.equals(defaultSegment)).findFirst().get(); + DataSegment default2AfterUpdate = updated.stream().filter(s -> s.equals(defaultSegment2)).findFirst().get(); + + Assert.assertNotNull(defaultAfterUpdate); + Assert.assertNotNull(default2AfterUpdate); + + // check that default did not change + Assert.assertEquals(defaultSegment.getSize(), defaultAfterUpdate.getSize()); + // but that default 2 did change + Assert.assertEquals(defaultSegment2WithBiggerSize.getSize(), default2AfterUpdate.getSize()); + } + @Test public void testSingleAdditionalNumberedShardWithNoCorePartitions() throws IOException { From f040a03f972eeac5efb9ef0df4e3dd80d0dd3dd8 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sat, 22 Jul 2023 12:00:23 -0700 Subject: [PATCH 2/4] (perf) server: batch SQL Metadata deleteSegments and updateSegmentMetadata pr feedback: - extract batch update and delete data generation outside of the SQL transaction, - avoid a query altogether if there are no segments to add, - improvement to logging --- .../IndexerSQLMetadataStorageCoordinator.java | 91 +++++++++++-------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 186fb8938acf..09c55fe715e5 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1755,32 +1755,45 @@ public Boolean withHandle(Handle handle) @Override public void updateSegmentMetadata(final Set segments) { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - final String updatePayload = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()); - final PreparedBatch batch = handle.prepareBatch(updatePayload); + if (segments.isEmpty()) { + return; + } - for (final DataSegment segment : segments) { - String id = segment.getId().toString(); - byte[] payload = jsonMapper.writeValueAsBytes(segment); + final int segmentSize = segments.size(); + final String updateSql = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()); + final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); - batch.bind("id", id).bind("payload", payload).add(); - } + // generate the JSON outside the transaction block + final List> idsAndPayloads = new ArrayList<>(segmentSize); + for (DataSegment seg : segments) { + try { + idsAndPayloads.add(Pair.of( + seg.getId().toString(), + jsonMapper.writeValueAsBytes(seg))); + } + catch (JsonProcessingException e) { + log.error(e, "Exception serializing to segment to JSON for dataSource [%s], segment [%s]", dataSource, seg.getId().toString()); + throw new RuntimeException(e); + } + } - try { - batch.execute(); - } - catch (DBIException e) { - log.error(e, "Exception inserting into DB"); - throw e; - } + connector.getDBI().inTransaction((TransactionCallback) (handle, transactionStatus) -> { + final PreparedBatch batch = handle.prepareBatch(updateSql); + + for (final Pair idAndPayload : idsAndPayloads) { + batch.bind("id", idAndPayload.lhs).bind("payload", idAndPayload.rhs).add(); + } + + try { + batch.execute(); + } - return null; + catch (DBIException e) { + log.error(e, "Exception updating segment metadata in DB for dataSource [%s], count [%d]", dataSource, segmentSize); + throw e; } + + return null; } ); } @@ -1788,30 +1801,32 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) th @Override public void deleteSegments(final Set segments) { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) - { - int segmentSize = segments.size(); - String dataSource = segments.stream().findAny().map(DataSegment::getDataSource).orElse("?"); + if (segments.isEmpty()) { + log.info("Removed [0] segments from metadata storage for dataSource [\"\"]!"); + return; + } - final String deleteFrom = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); - final PreparedBatch batch = handle.prepareBatch(deleteFrom); + final int segmentSize = segments.size(); + final String deleteSql = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); + final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); - for (final DataSegment segment : segments) { - batch.bind("id", segment.getId().toString()).add(); - } - batch.execute(); + // generate the IDs outside the transaction block + final List ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList()); - log.debugSegments(segments, "Delete the metadata of segments"); - log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); + connector.getDBI().inTransaction((TransactionCallback) (handle, transactionStatus) -> { + final PreparedBatch batch = handle.prepareBatch(deleteSql); - return null; + for (final String id : ids) { + batch.bind("id", id).add(); } + + batch.execute(); + return null; } ); + + log.debugSegments(segments, "Delete the metadata of segments"); + log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); } @Override From 1f462ff79e376f324f290f092844b9e8bb0c1a94 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sat, 22 Jul 2023 12:23:31 -0700 Subject: [PATCH 3/4] (perf) server: batch SQL Metadata deleteSegments removed the updateSegmentMetadata batching since it is dead code --- .../IndexerSQLMetadataStorageCoordinator.java | 64 ++++++++----------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 09c55fe715e5..b0cffc20e621 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -70,7 +70,6 @@ import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.CallbackFailedException; -import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -1755,45 +1754,18 @@ public Boolean withHandle(Handle handle) @Override public void updateSegmentMetadata(final Set segments) { - if (segments.isEmpty()) { - return; - } - - final int segmentSize = segments.size(); - final String updateSql = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()); - final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); - - // generate the JSON outside the transaction block - final List> idsAndPayloads = new ArrayList<>(segmentSize); - for (DataSegment seg : segments) { - try { - idsAndPayloads.add(Pair.of( - seg.getId().toString(), - jsonMapper.writeValueAsBytes(seg))); - } - catch (JsonProcessingException e) { - log.error(e, "Exception serializing to segment to JSON for dataSource [%s], segment [%s]", dataSource, seg.getId().toString()); - throw new RuntimeException(e); - } - } - - connector.getDBI().inTransaction((TransactionCallback) (handle, transactionStatus) -> { - final PreparedBatch batch = handle.prepareBatch(updateSql); - - for (final Pair idAndPayload : idsAndPayloads) { - batch.bind("id", idAndPayload.lhs).bind("payload", idAndPayload.rhs).add(); - } - - try { - batch.execute(); - } + connector.getDBI().inTransaction( + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for (final DataSegment segment : segments) { + updatePayload(handle, segment); + } - catch (DBIException e) { - log.error(e, "Exception updating segment metadata in DB for dataSource [%s], count [%d]", dataSource, segmentSize); - throw e; + return null; } - - return null; } ); } @@ -1829,6 +1801,22 @@ public void deleteSegments(final Set segments) log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); } + private void updatePayload(final Handle handle, final DataSegment segment) throws IOException + { + try { + handle + .createStatement( + StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + ) + .bind("id", segment.getId().toString()) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .execute(); + } + catch (IOException e) { + log.error(e, "Exception inserting into DB"); + throw e; + } + } @Override public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) { From d28f768c3ba793f3996c4def6fc86c5ea9f19a36 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sat, 22 Jul 2023 21:35:59 -0700 Subject: [PATCH 4/4] (perf) server: batch SQL Metadata deleteSegments pr feedback - improve logging accuracy - restore missing newline --- .../IndexerSQLMetadataStorageCoordinator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index b0cffc20e621..107dc4b9f97a 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1774,31 +1774,30 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) th public void deleteSegments(final Set segments) { if (segments.isEmpty()) { - log.info("Removed [0] segments from metadata storage for dataSource [\"\"]!"); + log.info("No segments to delete."); return; } - final int segmentSize = segments.size(); final String deleteSql = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); // generate the IDs outside the transaction block final List ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList()); - connector.getDBI().inTransaction((TransactionCallback) (handle, transactionStatus) -> { + int numDeletedSegments = connector.getDBI().inTransaction((handle, transactionStatus) -> { final PreparedBatch batch = handle.prepareBatch(deleteSql); for (final String id : ids) { batch.bind("id", id).add(); } - batch.execute(); - return null; + int[] deletedRows = batch.execute(); + return Arrays.stream(deletedRows).sum(); } ); log.debugSegments(segments, "Delete the metadata of segments"); - log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); + log.info("Deleted [%d] segments from metadata storage for dataSource [%s].", numDeletedSegments, dataSource); } private void updatePayload(final Handle handle, final DataSegment segment) throws IOException @@ -1817,6 +1816,7 @@ private void updatePayload(final Handle handle, final DataSegment segment) throw throw e; } } + @Override public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) {