diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 186fb8938acf..09c55fe715e5 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1755,32 +1755,45 @@ public Boolean withHandle(Handle handle) @Override public void updateSegmentMetadata(final Set segments) { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - final String updatePayload = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()); - final PreparedBatch batch = handle.prepareBatch(updatePayload); + if (segments.isEmpty()) { + return; + } - for (final DataSegment segment : segments) { - String id = segment.getId().toString(); - byte[] payload = jsonMapper.writeValueAsBytes(segment); + final int segmentSize = segments.size(); + final String updateSql = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()); + final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); - batch.bind("id", id).bind("payload", payload).add(); - } + // generate the JSON outside the transaction block + final List> idsAndPayloads = new ArrayList<>(segmentSize); + for (DataSegment seg : segments) { + try { + idsAndPayloads.add(Pair.of( + seg.getId().toString(), + jsonMapper.writeValueAsBytes(seg))); + } + catch (JsonProcessingException e) { + log.error(e, "Exception serializing to segment to JSON for dataSource [%s], segment [%s]", dataSource, seg.getId().toString()); + throw new RuntimeException(e); + } + } - try { - batch.execute(); - } - catch (DBIException e) { - log.error(e, "Exception inserting into DB"); - throw e; - } + connector.getDBI().inTransaction((TransactionCallback) (handle, transactionStatus) -> { + final PreparedBatch batch = handle.prepareBatch(updateSql); + + for (final Pair idAndPayload : idsAndPayloads) { + batch.bind("id", idAndPayload.lhs).bind("payload", idAndPayload.rhs).add(); + } + + try { + batch.execute(); + } - return null; + catch (DBIException e) { + log.error(e, "Exception updating segment metadata in DB for dataSource [%s], count [%d]", dataSource, segmentSize); + throw e; } + + return null; } ); } @@ -1788,30 +1801,32 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) th @Override public void deleteSegments(final Set segments) { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) - { - int segmentSize = segments.size(); - String dataSource = segments.stream().findAny().map(DataSegment::getDataSource).orElse("?"); + if (segments.isEmpty()) { + log.info("Removed [0] segments from metadata storage for dataSource [\"\"]!"); + return; + } - final String deleteFrom = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); - final PreparedBatch batch = handle.prepareBatch(deleteFrom); + final int segmentSize = segments.size(); + final String deleteSql = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()); + final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get(); - for (final DataSegment segment : segments) { - batch.bind("id", segment.getId().toString()).add(); - } - batch.execute(); + // generate the IDs outside the transaction block + final List ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList()); - log.debugSegments(segments, "Delete the metadata of segments"); - log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); + connector.getDBI().inTransaction((TransactionCallback) (handle, transactionStatus) -> { + final PreparedBatch batch = handle.prepareBatch(deleteSql); - return null; + for (final String id : ids) { + batch.bind("id", id).add(); } + + batch.execute(); + return null; } ); + + log.debugSegments(segments, "Delete the metadata of segments"); + log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource); } @Override