Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code to wait for segments generated to be loaded on historicals #14322

Merged
merged 26 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2539f86
Add code to wait for segments generated to be loaded on historicals
adarshsanjeev May 20, 2023
92b432d
Add broker client and use it to query broker for segment load status
adarshsanjeev Jun 13, 2023
68210ba
Temp
adarshsanjeev Jun 14, 2023
5059fba
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Jul 4, 2023
52aef1d
Update query with replication factor
adarshsanjeev Jul 5, 2023
a7de731
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Jul 27, 2023
529a577
Cleanup code
adarshsanjeev Jul 27, 2023
650f75b
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Aug 3, 2023
c49f871
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Aug 6, 2023
f19c8f7
Code cleanup
adarshsanjeev Aug 7, 2023
d86ad22
Improve coverage
adarshsanjeev Aug 9, 2023
5f79d68
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Aug 9, 2023
47da704
Resolve build failures
adarshsanjeev Aug 9, 2023
e2b1787
Address review comments
adarshsanjeev Aug 11, 2023
b922b2f
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Aug 22, 2023
cc5456b
Address review comments
adarshsanjeev Aug 22, 2023
2d9db10
Address review comments
adarshsanjeev Aug 22, 2023
2401c0c
Address review comments
adarshsanjeev Aug 22, 2023
23c2f89
Update names
adarshsanjeev Aug 22, 2023
c323474
Update names
adarshsanjeev Aug 22, 2023
6b99073
Increase coverage
adarshsanjeev Aug 22, 2023
7675c8c
Fix spelling
adarshsanjeev Aug 22, 2023
f709e71
Add java docs
adarshsanjeev Sep 5, 2023
41aa0a5
Update error message
adarshsanjeev Sep 5, 2023
ca398b7
Merge remote-tracking branch 'origin/master' into controller-load-seg…
adarshsanjeev Sep 5, 2023
7ca9465
Update extensions-core/multi-stage-query/src/main/java/org/apache/dru…
cryptoe Sep 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/api-reference/sql-ingestion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,15 @@ The response shows an example report for a query.
"startTime": "2022-09-14T22:12:09.266Z",
"durationMs": 28227,
"pendingTasks": 0,
"runningTasks": 2
"runningTasks": 2,
"segmentLoadWaiterStatus": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: segmentLoadStatus?
What is this start time ?
How would segments which match a drop rule get communicated to the console?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startTime is the time at which we started checking the load status. That with duration would give us a clear idea of when it started and when it ended, and this is the structure for other MSQ stages.

"state": "SUCCESS",
"dataSource": "kttm_simple",
"startTime": "2022-09-14T23:12:09.266Z",
"duration": 15,
"totalSegments": 1,
"segmentsLeft": 0
}
},
"stages": [
{
Expand Down Expand Up @@ -562,6 +570,12 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.durationMs` | Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet. |
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadWaiterStatus` | Segment loading waiter container. Only present after the segments have been published. |
| `multiStageQuery.payload.status.segmentLoadWaiterStatus.state` | Either INIT, WAITING, SUCCESS, FAILED or TIMED_OUT. |
| `multiStageQuery.payload.status.segmentLoadWaiterStatus.startTime` | Time since which the controller has been waiting for the segments to finish loading. |
| `multiStageQuery.payload.status.segmentLoadWaiterStatus.duration` | The duration in milliseconds that the controller has been waiting for the segments to load. |
| `multiStageQuery.payload.status.segmentLoadWaiterStatus.totalSegments` | The total number of segments generated by the job. This includes tombstone segments (if any). |
| `multiStageQuery.payload.status.segmentLoadWaiterStatus.segmentsLeft` | The total number of segments remaining to be loaded. |
| `multiStageQuery.payload.status.errorReport` | Error object. Only present if there was an error. |
| `multiStageQuery.payload.status.errorReport.taskId` | The task that reported the error, if known. May be a controller task or a worker task. |
| `multiStageQuery.payload.status.errorReport.host` | The hostname and port of the task that reported the error, if known. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.key.ClusterBy;
Expand Down Expand Up @@ -290,6 +291,7 @@ public class ControllerImpl implements Controller
private WorkerMemoryParameters workerMemoryParameters;
private boolean isDurableStorageEnabled;
private boolean isFaultToleranceEnabled;
private volatile SegmentLoadWaiter segmentLoadWaiter;

public ControllerImpl(
final MSQControllerTask task,
Expand Down Expand Up @@ -429,6 +431,37 @@ public TaskStatus runTask(final Closer closer)
}
}

if (queryKernel != null && queryKernel.isSuccess()) {
// If successful, encourage the tasks to exit successfully.
postFinishToAllTasks();
workerTaskLauncher.stop(false);
} else {
// If not successful, cancel running tasks.
if (workerTaskLauncher != null) {
workerTaskLauncher.stop(true);
}
}

// Wait for worker tasks to exit. Ignore their return status. At this point, we've done everything we need to do,
// so we don't care about the task exit status.
if (workerTaskRunnerFuture != null) {
try {
workerTaskRunnerFuture.get();
}
catch (Exception ignored) {
// Suppress.
}
}

cleanUpDurableStorageIfNeeded();

if (queryKernel != null && queryKernel.isSuccess()) {
if (segmentLoadWaiter != null) {
// If successful and there are segments created, segmentLoadWaiter should wait for them to become available.
segmentLoadWaiter.waitForSegmentsToLoad();
}
}

try {
// Write report even if something went wrong.
final MSQStagesReport stagesReport;
Expand Down Expand Up @@ -480,7 +513,8 @@ public TaskStatus runTask(final Closer closer)
workerWarnings,
queryStartTime,
new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher
workerTaskLauncher,
segmentLoadWaiter
),
stagesReport,
countersSnapshot,
Expand All @@ -496,30 +530,6 @@ public TaskStatus runTask(final Closer closer)
log.warn(e, "Error encountered while writing task report. Skipping.");
}

if (queryKernel != null && queryKernel.isSuccess()) {
// If successful, encourage the tasks to exit successfully.
postFinishToAllTasks();
workerTaskLauncher.stop(false);
} else {
// If not successful, cancel running tasks.
if (workerTaskLauncher != null) {
workerTaskLauncher.stop(true);
}
}

// Wait for worker tasks to exit. Ignore their return status. At this point, we've done everything we need to do,
// so we don't care about the task exit status.
if (workerTaskRunnerFuture != null) {
try {
workerTaskRunnerFuture.get();
}
catch (Exception ignored) {
// Suppress.
}
}

cleanUpDurableStorageIfNeeded();

if (taskStateForReport == TaskState.SUCCESS) {
return TaskStatus.success(id());
} else {
Expand Down Expand Up @@ -858,7 +868,8 @@ public Map<String, TaskReport> liveReports()
workerWarnings,
queryStartTime,
queryStartTime == null ? -1L : new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher
workerTaskLauncher,
segmentLoadWaiter
),
makeStageReport(
queryDef,
Expand Down Expand Up @@ -1299,17 +1310,36 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
if (segmentsWithTombstones.isEmpty()) {
// Nothing to publish, only drop. We already validated that the intervalsToDrop do not have any
// partially-overlapping segments, so it's safe to drop them as intervals instead of as specific segments.
// This should not need a segment load wait as segments are marked as unused immediately.
for (final Interval interval : intervalsToDrop) {
context.taskActionClient()
.submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval));
}
} else {
Set<String> versionsToAwait = segmentsWithTombstones.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
segmentLoadWaiter = new SegmentLoadWaiter(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
task.getDataSource(),
versionsToAwait,
segmentsWithTombstones.size(),
true
);
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Set<String> versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There would always be one version rite ?
Can we add a check here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this initially as well, but looking into the code, we actually generate the version based on the lock we acquire for the segment. So if there are multiple intervals we are replacing into and therefore multiple locks, we would have more than one version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh i did not know that. Thanks for explaining the rational. We could also document this as well :)

segmentLoadWaiter = new SegmentLoadWaiter(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
task.getDataSource(),
versionsToAwait,
segmentsWithTombstones.size(),
true
);
// Append mode.
performSegmentPublish(
context.taskActionClient(),
Expand Down Expand Up @@ -2055,7 +2085,8 @@ private static MSQStatusReport makeStatusReport(
final Queue<MSQErrorReport> errorReports,
@Nullable final DateTime queryStartTime,
final long queryDuration,
MSQWorkerTaskLauncher taskLauncher
MSQWorkerTaskLauncher taskLauncher,
final SegmentLoadWaiter segmentLoadWaiter
)
{
int pendingTasks = -1;
Expand All @@ -2066,14 +2097,18 @@ private static MSQStatusReport makeStatusReport(
pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
}

SegmentLoadWaiter.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();

return new MSQStatusReport(
taskState,
errorReport,
errorReports,
queryStartTime,
queryDuration,
pendingTasks,
runningTasks
runningTasks,
status
);
}

Expand Down Expand Up @@ -2242,6 +2277,7 @@ private Pair<ControllerQueryKernel, ListenableFuture<?>> run() throws IOExceptio
throwKernelExceptionIfNotUnknown();
}

updateLiveReportMaps();
cleanUpEffectivelyFinishedStages();
return Pair.of(queryKernel, workerTaskLauncherFuture);
}
Expand Down
Loading