Skip to content

Commit

Permalink
(perf) server: batch SQL Metadata deleteSegments
Browse files Browse the repository at this point in the history
removed the updateSegmentMetadata batching since it is dead code
  • Loading branch information
jasonk000 committed Jul 22, 2023
1 parent f040a03 commit 1f462ff
Showing 1 changed file with 26 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;

Expand Down Expand Up @@ -1755,45 +1754,18 @@ public Boolean withHandle(Handle handle)
@Override
public void updateSegmentMetadata(final Set<DataSegment> segments)
{
if (segments.isEmpty()) {
return;
}

final int segmentSize = segments.size();
final String updateSql = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable());
final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get();

// generate the JSON outside the transaction block
final List<Pair<String, byte[]>> idsAndPayloads = new ArrayList<>(segmentSize);
for (DataSegment seg : segments) {
try {
idsAndPayloads.add(Pair.of(
seg.getId().toString(),
jsonMapper.writeValueAsBytes(seg)));
}
catch (JsonProcessingException e) {
log.error(e, "Exception serializing to segment to JSON for dataSource [%s], segment [%s]", dataSource, seg.getId().toString());
throw new RuntimeException(e);
}
}

connector.getDBI().inTransaction((TransactionCallback<Void>) (handle, transactionStatus) -> {
final PreparedBatch batch = handle.prepareBatch(updateSql);

for (final Pair<String, byte[]> idAndPayload : idsAndPayloads) {
batch.bind("id", idAndPayload.lhs).bind("payload", idAndPayload.rhs).add();
}

try {
batch.execute();
}
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
for (final DataSegment segment : segments) {
updatePayload(handle, segment);
}

catch (DBIException e) {
log.error(e, "Exception updating segment metadata in DB for dataSource [%s], count [%d]", dataSource, segmentSize);
throw e;
return null;
}

return null;
}
);
}
Expand Down Expand Up @@ -1829,6 +1801,22 @@ public void deleteSegments(final Set<DataSegment> segments)
log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource);
}

private void updatePayload(final Handle handle, final DataSegment segment) throws IOException
{
try {
handle
.createStatement(
StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable())
)
.bind("id", segment.getId().toString())
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
throw e;
}
}
@Override
public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata)
{
Expand Down

0 comments on commit 1f462ff

Please sign in to comment.