Skip to content

Commit

Permalink
(perf) server: batch SQL Metadata deleteSegments and updateSegmentMet…
Browse files Browse the repository at this point in the history
…adata

pr feedback:
- extract batch update and delete data generation outside of the SQL transaction,
- avoid a query altogether if there are no segments to add,
- improvement to logging
  • Loading branch information
jasonk000 committed Jul 22, 2023
1 parent d57c891 commit f040a03
Showing 1 changed file with 53 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1755,63 +1755,78 @@ public Boolean withHandle(Handle handle)
@Override
public void updateSegmentMetadata(final Set<DataSegment> segments)
{
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final String updatePayload = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable());
final PreparedBatch batch = handle.prepareBatch(updatePayload);
if (segments.isEmpty()) {
return;
}

for (final DataSegment segment : segments) {
String id = segment.getId().toString();
byte[] payload = jsonMapper.writeValueAsBytes(segment);
final int segmentSize = segments.size();
final String updateSql = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable());
final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get();

batch.bind("id", id).bind("payload", payload).add();
}
// generate the JSON outside the transaction block
final List<Pair<String, byte[]>> idsAndPayloads = new ArrayList<>(segmentSize);
for (DataSegment seg : segments) {
try {
idsAndPayloads.add(Pair.of(
seg.getId().toString(),
jsonMapper.writeValueAsBytes(seg)));
}
catch (JsonProcessingException e) {
log.error(e, "Exception serializing to segment to JSON for dataSource [%s], segment [%s]", dataSource, seg.getId().toString());
throw new RuntimeException(e);
}
}

try {
batch.execute();
}
catch (DBIException e) {
log.error(e, "Exception inserting into DB");
throw e;
}
connector.getDBI().inTransaction((TransactionCallback<Void>) (handle, transactionStatus) -> {
final PreparedBatch batch = handle.prepareBatch(updateSql);

for (final Pair<String, byte[]> idAndPayload : idsAndPayloads) {
batch.bind("id", idAndPayload.lhs).bind("payload", idAndPayload.rhs).add();
}

try {
batch.execute();
}

return null;
catch (DBIException e) {
log.error(e, "Exception updating segment metadata in DB for dataSource [%s], count [%d]", dataSource, segmentSize);
throw e;
}

return null;
}
);
}

@Override
public void deleteSegments(final Set<DataSegment> segments)
{
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus)
{
int segmentSize = segments.size();
String dataSource = segments.stream().findAny().map(DataSegment::getDataSource).orElse("?");
if (segments.isEmpty()) {
log.info("Removed [0] segments from metadata storage for dataSource [\"\"]!");
return;
}

final String deleteFrom = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable());
final PreparedBatch batch = handle.prepareBatch(deleteFrom);
final int segmentSize = segments.size();
final String deleteSql = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable());
final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get();

for (final DataSegment segment : segments) {
batch.bind("id", segment.getId().toString()).add();
}
batch.execute();
// generate the IDs outside the transaction block
final List<String> ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList());

log.debugSegments(segments, "Delete the metadata of segments");
log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource);
connector.getDBI().inTransaction((TransactionCallback<Void>) (handle, transactionStatus) -> {
final PreparedBatch batch = handle.prepareBatch(deleteSql);

return null;
for (final String id : ids) {
batch.bind("id", id).add();
}

batch.execute();
return null;
}
);

log.debugSegments(segments, "Delete the metadata of segments");
log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource);
}

@Override
Expand Down

0 comments on commit f040a03

Please sign in to comment.