-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add code to wait for segments generated to be loaded on historicals #14322
Changes from 14 commits
2539f86
92b432d
68210ba
5059fba
52aef1d
a7de731
529a577
650f75b
c49f871
f19c8f7
d86ad22
5f79d68
47da704
e2b1787
b922b2f
cc5456b
2d9db10
2401c0c
23c2f89
c323474
6b99073
7675c8c
f709e71
41aa0a5
ca398b7
7ca9465
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import org.apache.druid.data.input.impl.DimensionSchema; | ||
import org.apache.druid.data.input.impl.DimensionsSpec; | ||
import org.apache.druid.data.input.impl.TimestampSpec; | ||
import org.apache.druid.discovery.BrokerClient; | ||
import org.apache.druid.frame.allocation.ArenaMemoryAllocator; | ||
import org.apache.druid.frame.channel.FrameChannelSequence; | ||
import org.apache.druid.frame.key.ClusterBy; | ||
|
@@ -290,6 +291,7 @@ public class ControllerImpl implements Controller | |
private WorkerMemoryParameters workerMemoryParameters; | ||
private boolean isDurableStorageEnabled; | ||
private boolean isFaultToleranceEnabled; | ||
private volatile SegmentLoadWaiter segmentLoadWaiter; | ||
|
||
public ControllerImpl( | ||
final MSQControllerTask task, | ||
|
@@ -429,6 +431,37 @@ public TaskStatus runTask(final Closer closer) | |
} | ||
} | ||
|
||
if (queryKernel != null && queryKernel.isSuccess()) { | ||
// If successful, encourage the tasks to exit successfully. | ||
postFinishToAllTasks(); | ||
workerTaskLauncher.stop(false); | ||
} else { | ||
// If not successful, cancel running tasks. | ||
if (workerTaskLauncher != null) { | ||
workerTaskLauncher.stop(true); | ||
} | ||
} | ||
|
||
// Wait for worker tasks to exit. Ignore their return status. At this point, we've done everything we need to do, | ||
// so we don't care about the task exit status. | ||
if (workerTaskRunnerFuture != null) { | ||
try { | ||
workerTaskRunnerFuture.get(); | ||
} | ||
catch (Exception ignored) { | ||
// Suppress. | ||
} | ||
} | ||
|
||
cleanUpDurableStorageIfNeeded(); | ||
|
||
if (queryKernel != null && queryKernel.isSuccess()) { | ||
if (segmentLoadWaiter != null) { | ||
// If successful and there are segments created, segmentLoadWaiter should wait for them to become available. | ||
segmentLoadWaiter.waitForSegmentsToLoad(); | ||
} | ||
} | ||
|
||
try { | ||
// Write report even if something went wrong. | ||
final MSQStagesReport stagesReport; | ||
|
@@ -480,7 +513,8 @@ public TaskStatus runTask(final Closer closer) | |
workerWarnings, | ||
queryStartTime, | ||
new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(), | ||
workerTaskLauncher | ||
workerTaskLauncher, | ||
segmentLoadWaiter | ||
), | ||
stagesReport, | ||
countersSnapshot, | ||
|
@@ -496,30 +530,6 @@ public TaskStatus runTask(final Closer closer) | |
log.warn(e, "Error encountered while writing task report. Skipping."); | ||
} | ||
|
||
if (queryKernel != null && queryKernel.isSuccess()) { | ||
// If successful, encourage the tasks to exit successfully. | ||
postFinishToAllTasks(); | ||
workerTaskLauncher.stop(false); | ||
} else { | ||
// If not successful, cancel running tasks. | ||
if (workerTaskLauncher != null) { | ||
workerTaskLauncher.stop(true); | ||
} | ||
} | ||
|
||
// Wait for worker tasks to exit. Ignore their return status. At this point, we've done everything we need to do, | ||
// so we don't care about the task exit status. | ||
if (workerTaskRunnerFuture != null) { | ||
try { | ||
workerTaskRunnerFuture.get(); | ||
} | ||
catch (Exception ignored) { | ||
// Suppress. | ||
} | ||
} | ||
|
||
cleanUpDurableStorageIfNeeded(); | ||
|
||
if (taskStateForReport == TaskState.SUCCESS) { | ||
return TaskStatus.success(id()); | ||
} else { | ||
|
@@ -858,7 +868,8 @@ public Map<String, TaskReport> liveReports() | |
workerWarnings, | ||
queryStartTime, | ||
queryStartTime == null ? -1L : new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(), | ||
workerTaskLauncher | ||
workerTaskLauncher, | ||
segmentLoadWaiter | ||
), | ||
makeStageReport( | ||
queryDef, | ||
|
@@ -1299,17 +1310,36 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept | |
if (segmentsWithTombstones.isEmpty()) { | ||
// Nothing to publish, only drop. We already validated that the intervalsToDrop do not have any | ||
// partially-overlapping segments, so it's safe to drop them as intervals instead of as specific segments. | ||
// This should not need a segment load wait as segments are marked as unused immediately. | ||
for (final Interval interval : intervalsToDrop) { | ||
context.taskActionClient() | ||
.submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval)); | ||
} | ||
} else { | ||
Set<String> versionsToAwait = segmentsWithTombstones.stream().map(DataSegment::getVersion).collect(Collectors.toSet()); | ||
segmentLoadWaiter = new SegmentLoadWaiter( | ||
context.injector().getInstance(BrokerClient.class), | ||
context.jsonMapper(), | ||
task.getDataSource(), | ||
versionsToAwait, | ||
segmentsWithTombstones.size(), | ||
true | ||
); | ||
performSegmentPublish( | ||
context.taskActionClient(), | ||
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones) | ||
); | ||
} | ||
} else if (!segments.isEmpty()) { | ||
Set<String> versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There would always be one version rite ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this initially as well, but looking into the code, we actually generate the version based on the lock we acquire for the segment. So if there are multiple intervals we are replacing into and therefore multiple locks, we would have more than one version. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh i did not know that. Thanks for explaining the rational. We could also document this as well :) |
||
segmentLoadWaiter = new SegmentLoadWaiter( | ||
context.injector().getInstance(BrokerClient.class), | ||
context.jsonMapper(), | ||
task.getDataSource(), | ||
versionsToAwait, | ||
segmentsWithTombstones.size(), | ||
true | ||
); | ||
// Append mode. | ||
performSegmentPublish( | ||
context.taskActionClient(), | ||
|
@@ -2055,7 +2085,8 @@ private static MSQStatusReport makeStatusReport( | |
final Queue<MSQErrorReport> errorReports, | ||
@Nullable final DateTime queryStartTime, | ||
final long queryDuration, | ||
MSQWorkerTaskLauncher taskLauncher | ||
MSQWorkerTaskLauncher taskLauncher, | ||
final SegmentLoadWaiter segmentLoadWaiter | ||
) | ||
{ | ||
int pendingTasks = -1; | ||
|
@@ -2066,14 +2097,18 @@ private static MSQStatusReport makeStatusReport( | |
pendingTasks = workerTaskCount.getPendingWorkerCount(); | ||
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller. | ||
} | ||
|
||
SegmentLoadWaiter.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status(); | ||
|
||
return new MSQStatusReport( | ||
taskState, | ||
errorReport, | ||
errorReports, | ||
queryStartTime, | ||
queryDuration, | ||
pendingTasks, | ||
runningTasks | ||
runningTasks, | ||
status | ||
); | ||
} | ||
|
||
|
@@ -2242,6 +2277,7 @@ private Pair<ControllerQueryKernel, ListenableFuture<?>> run() throws IOExceptio | |
throwKernelExceptionIfNotUnknown(); | ||
} | ||
|
||
updateLiveReportMaps(); | ||
cleanUpEffectivelyFinishedStages(); | ||
return Pair.of(queryKernel, workerTaskLauncherFuture); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: segmentLoadStatus?
What is this start time ?
How would segments which match a drop rule get communicated to the console?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startTime is the time at which we started checking the load status. That with duration would give us a clear idea of when it started and when it ended, and this is the structure for other MSQ stages.