Skip to content

Commit

Permalink
(perf) server: batch SQL Metadata deleteSegments and updateSegmentMet…
Browse files Browse the repository at this point in the history
…adata
  • Loading branch information
jasonk000 committed Jul 21, 2023
1 parent d0654e2 commit 3758d7a
Showing 1 changed file with 24 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;

Expand Down Expand Up @@ -1760,8 +1761,22 @@ public void updateSegmentMetadata(final Set<DataSegment> segments)
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final String updatePayload = StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable());
final PreparedBatch batch = handle.prepareBatch(updatePayload);

for (final DataSegment segment : segments) {
updatePayload(handle, segment);
String id = segment.getId().toString();
byte[] payload = jsonMapper.writeValueAsBytes(segment);

batch.bind("id", id).bind("payload", payload).add();
}

try {
batch.execute();
}
catch (DBIException e) {
log.error(e, "Exception inserting into DB");
throw e;
}

return null;
Expand All @@ -1780,11 +1795,16 @@ public void deleteSegments(final Set<DataSegment> segments)
public Void inTransaction(Handle handle, TransactionStatus transactionStatus)
{
int segmentSize = segments.size();
String dataSource = "";
String dataSource = segments.stream().findAny().map(DataSegment::getDataSource).orElse("?");

final String deleteFrom = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable());
final PreparedBatch batch = handle.prepareBatch(deleteFrom);

for (final DataSegment segment : segments) {
dataSource = segment.getDataSource();
deleteSegment(handle, segment);
batch.bind("id", segment.getId().toString()).add();
}
batch.execute();

log.debugSegments(segments, "Delete the metadata of segments");
log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource);

Expand All @@ -1794,30 +1814,6 @@ public Void inTransaction(Handle handle, TransactionStatus transactionStatus)
);
}

private void deleteSegment(final Handle handle, final DataSegment segment)
{
handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()))
.bind("id", segment.getId().toString())
.execute();
}

private void updatePayload(final Handle handle, final DataSegment segment) throws IOException
{
try {
handle
.createStatement(
StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable())
)
.bind("id", segment.getId().toString())
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
throw e;
}
}

@Override
public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata)
{
Expand Down

0 comments on commit 3758d7a

Please sign in to comment.