Skip to content

Commit

Permalink
Added UI support for waitTillSegmentsLoad (apache#15110)
Browse files Browse the repository at this point in the history
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
  • Loading branch information
lorem--ipsum authored and LakshSingla committed Oct 14, 2023
1 parent e1a6c3a commit 632d35a
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 4 deletions.
3 changes: 3 additions & 0 deletions web-console/src/druid-models/execution/execution.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ describe('Execution', () => {
"maxNumTasks": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm_simple\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down Expand Up @@ -643,6 +644,7 @@ describe('Execution', () => {
"sqlQuery": undefined,
"sqlQueryId": undefined,
},
"segmentStatus": undefined,
"sqlQuery": undefined,
"stages": undefined,
"startTime": 2023-07-05T21:33:19.147Z,
Expand Down Expand Up @@ -679,6 +681,7 @@ describe('Execution', () => {
"nativeQuery": undefined,
"queryContext": undefined,
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": undefined,
"stages": undefined,
"startTime": 2023-07-05T21:40:39.986Z,
Expand Down
50 changes: 50 additions & 0 deletions web-console/src/druid-models/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ function formatPendingMessage(
}
}

interface SegmentStatus {
duration: number;
onDemandSegments: number;
pendingSegments: number;
precachedSegments: number;
startTime: Date;
state: 'INIT' | 'WAITING' | 'SUCCESS';
totalSegments: number;
unknownSegments: number;
usedSegments: number;
}

export interface ExecutionValue {
engine: DruidEngine;
id: string;
Expand All @@ -182,6 +194,7 @@ export interface ExecutionValue {
warnings?: ExecutionError[];
capacityInfo?: CapacityInfo;
_payload?: MsqTaskPayloadResponse;
segmentStatus?: SegmentStatus;
}

export class Execution {
Expand Down Expand Up @@ -292,6 +305,11 @@ export class Execution {
const startTime = new Date(deepGet(taskReport, 'multiStageQuery.payload.status.startTime'));
const durationMs = deepGet(taskReport, 'multiStageQuery.payload.status.durationMs');

const segmentLoaderStatus = deepGet(
taskReport,
'multiStageQuery.payload.status.segmentLoadWaiterStatus',
);

let result: QueryResult | undefined;
const resultsPayload: {
signature: { name: string; type: string }[];
Expand All @@ -313,6 +331,7 @@ export class Execution {
engine: 'sql-msq-task',
id,
status: Execution.normalizeTaskStatus(status),
segmentStatus: segmentLoaderStatus,
startTime: isNaN(startTime.getTime()) ? undefined : startTime,
duration: typeof durationMs === 'number' ? durationMs : undefined,
usageInfo: getUsageInfoFromStatusPayload(
Expand Down Expand Up @@ -369,6 +388,7 @@ export class Execution {
public readonly error?: ExecutionError;
public readonly warnings?: ExecutionError[];
public readonly capacityInfo?: CapacityInfo;
public readonly segmentStatus?: SegmentStatus;

public readonly _payload?: { payload: any; task: string };

Expand All @@ -390,6 +410,7 @@ export class Execution {
this.error = value.error;
this.warnings = nonEmptyArray(value.warnings) ? value.warnings : undefined;
this.capacityInfo = value.capacityInfo;
this.segmentStatus = value.segmentStatus;

this._payload = value._payload;
}
Expand All @@ -412,6 +433,7 @@ export class Execution {
error: this.error,
warnings: this.warnings,
capacityInfo: this.capacityInfo,
segmentStatus: this.segmentStatus,

_payload: this._payload,
};
Expand Down Expand Up @@ -526,6 +548,34 @@ export class Execution {
return status !== 'SUCCESS' && status !== 'FAILED';
}

public getSegmentStatusDescription() {
const { segmentStatus } = this;

let label = '';

switch (segmentStatus?.state) {
case 'INIT':
label = 'Waiting for segments loading to start...';
break;

case 'WAITING':
label = 'Waiting for segments loading to complete...';
break;

case 'SUCCESS':
label = 'Segments loaded successfully in ' + segmentStatus.duration + 'ms.';
break;

default:
break;
}

return {
label,
...segmentStatus,
};
}

public isFullyComplete(): boolean {
if (this.isWaitingForQuery()) return false;

Expand Down
16 changes: 16 additions & 0 deletions web-console/src/druid-models/query-context/query-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ export function changeFinalizeAggregations(
: deepDelete(context, 'finalizeAggregations');
}

// waitTillSegmentsLoad

export function getWaitTillSegmentsLoad(context: QueryContext): boolean | undefined {
const { waitTillSegmentsLoad } = context;
return typeof waitTillSegmentsLoad === 'boolean' ? waitTillSegmentsLoad : undefined;
}

export function changeWaitTillSegmentsLoad(
context: QueryContext,
waitTillSegmentsLoad: boolean | undefined,
): QueryContext {
return typeof waitTillSegmentsLoad === 'boolean'
? deepSet(context, 'waitTillSegmentsLoad', waitTillSegmentsLoad)
: deepDelete(context, 'waitTillSegmentsLoad');
}

// groupByEnableMultiValueUnnesting

export function getGroupByEnableMultiValueUnnesting(context: QueryContext): boolean | undefined {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ describe('WorkbenchQuery', () => {
finalizeAggregations: false,
groupByEnableMultiValueUnnesting: false,
useCache: false,
waitTillSegmentsLoad: true,
},
header: true,
query: 'INSERT INTO wiki2 SELECT * FROM wikipedia',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ export class WorkbenchQuery {
apiQuery.context.executionMode ??= 'async';
apiQuery.context.finalizeAggregations ??= !ingestQuery;
apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery;
apiQuery.context.waitTillSegmentsLoad ??= true;
}

if (Array.isArray(queryParameters) && queryParameters.length) {
Expand Down
13 changes: 12 additions & 1 deletion web-console/src/helpers/execution/sql-task-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ export interface SubmitTaskQueryOptions {
export async function submitTaskQuery(
options: SubmitTaskQueryOptions,
): Promise<Execution | IntermediateQueryState<Execution>> {
const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;
const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;

// setting waitTillSegmentsLoad to true by default
const context = {
waitTillSegmentsLoad: true,
...(options.context || {}),
};

let sqlQuery: string;
let jsonQuery: Record<string, any>;
Expand Down Expand Up @@ -261,6 +267,11 @@ export async function updateExecutionWithDatasourceLoadedIfNeeded(
return execution;
}

// This means we don't have to perform the SQL query to check if the segments are loaded
if (execution.queryContext?.waitTillSegmentsLoad === true) {
return execution.markDestinationDatasourceLoaded();
}

const endTime = execution.getEndTime();
if (
!endTime || // If endTime is not set (this is not expected to happen) then just bow out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ exports[`ExecutionDetailsPane matches snapshot no init tab 1`] = `
"id": "native",
"label": "Native query",
},
false,
undefined,
undefined,
Object {
Expand Down Expand Up @@ -286,6 +287,7 @@ PARTITIONED BY DAY",
"maxParseExceptions": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down Expand Up @@ -909,6 +911,7 @@ PARTITIONED BY DAY",
"maxParseExceptions": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down Expand Up @@ -1319,6 +1322,7 @@ exports[`ExecutionDetailsPane matches snapshot with init tab 1`] = `
"id": "native",
"label": "Native query",
},
false,
undefined,
undefined,
Object {
Expand Down Expand Up @@ -1576,6 +1580,7 @@ PARTITIONED BY DAY",
"maxParseExceptions": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import React, { useState } from 'react';

import { FancyTabPane } from '../../../components';
import type { Execution } from '../../../druid-models';
import { pluralIfNeeded } from '../../../utils';
import { formatDuration, formatDurationWithMs, pluralIfNeeded } from '../../../utils';
import { DestinationPagesPane } from '../destination-pages-pane/destination-pages-pane';
import { ExecutionErrorPane } from '../execution-error-pane/execution-error-pane';
import { ExecutionStagesPane } from '../execution-stages-pane/execution-stages-pane';
Expand All @@ -40,7 +40,8 @@ export type ExecutionDetailsTab =
| 'result'
| 'pages'
| 'error'
| 'warnings';
| 'warnings'
| 'segmentStatus';

interface ExecutionDetailsPaneProps {
execution: Execution;
Expand All @@ -53,6 +54,7 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
) {
const { execution, initTab, goToTask } = props;
const [activeTab, setActiveTab] = useState<ExecutionDetailsTab>(initTab || 'general');
const segmentStatusDescription = execution.getSegmentStatusDescription();

function renderContent() {
switch (activeTab) {
Expand Down Expand Up @@ -120,6 +122,25 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
case 'warnings':
return <ExecutionWarningsPane execution={execution} />;

case 'segmentStatus':
return (
<>
<p>
Duration:{' '}
{segmentStatusDescription.duration
? formatDurationWithMs(segmentStatusDescription.duration)
: '-'}
{execution.duration
? ` (query duration was ${formatDuration(execution.duration)})`
: ''}
</p>
<p>Total segments: {segmentStatusDescription.totalSegments ?? '-'}</p>
<p>Used segments: {segmentStatusDescription.usedSegments ?? '-'}</p>
<p>Precached segments: {segmentStatusDescription.precachedSegments ?? '-'}</p>
<p>On demand segments: {segmentStatusDescription.onDemandSegments ?? '-'}</p>
</>
);

default:
return;
}
Expand All @@ -146,6 +167,11 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
label: 'Native query',
icon: IconNames.COG,
},
Boolean(execution.segmentStatus) && {
id: 'segmentStatus',
label: 'Segments',
icon: IconNames.HEAT_GRID,
},
execution.result && {
id: 'result',
label: 'Results',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ exports[`ExecutionProgressBarPane matches snapshot 1`] = `
className="overall"
intent="primary"
/>
<Unknown>
</Unknown>
</div>
`;
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ export const ExecutionProgressBarPane = React.memo(function ExecutionProgressBar

const idx = stages ? stages.currentStageIndex() : -1;
const waitingForSegments = stages && !execution.isWaitingForQuery();

const segmentStatusDescription = execution?.getSegmentStatusDescription();

return (
<div className="execution-progress-bar-pane">
<Label>
Expand Down Expand Up @@ -78,6 +81,7 @@ export const ExecutionProgressBarPane = React.memo(function ExecutionProgressBar
intent={stages ? Intent.PRIMARY : undefined}
value={stages && execution.isWaitingForQuery() ? stages.overallProgress() : undefined}
/>
{segmentStatusDescription && <Label>{segmentStatusDescription.label}</Label>}
{stages && idx >= 0 && (
<>
<Label>{`Current stage (${idx + 1} of ${stages.stageCount()})`}</Label>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ exports[`IngestSuccessPane matches snapshot 1`] = `
</p>
<p>
Insert query took 0:00:23.
<span
className="action"
onClick={[Function]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(

const warnings = execution.stages?.getWarningCount() || 0;

const duration = execution.duration;
const { duration } = execution;
const segmentStatusDescription = execution.getSegmentStatusDescription();

return (
<div className="ingest-success-pane">
<p>
Expand All @@ -63,10 +65,12 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(
</p>
<p>
{duration ? `Insert query took ${formatDuration(duration)}. ` : `Insert query completed. `}
{segmentStatusDescription ? segmentStatusDescription.label + ' ' : ''}
<span className="action" onClick={() => onDetails(execution.id)}>
Show details
</span>
</p>

{onQueryTab && (
<p>
Open new tab with:{' '}
Expand Down
12 changes: 12 additions & 0 deletions web-console/src/views/workbench-view/run-panel/run-panel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import {
changeUseApproximateCountDistinct,
changeUseApproximateTopN,
changeUseCache,
changeWaitTillSegmentsLoad,
getDurableShuffleStorage,
getFinalizeAggregations,
getGroupByEnableMultiValueUnnesting,
Expand All @@ -53,6 +54,7 @@ import {
getUseApproximateCountDistinct,
getUseApproximateTopN,
getUseCache,
getWaitTillSegmentsLoad,
summarizeIndexSpec,
} from '../../../druid-models';
import { deepGet, deepSet, pluralIfNeeded, tickIcon } from '../../../utils';
Expand Down Expand Up @@ -110,6 +112,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {

const maxParseExceptions = getMaxParseExceptions(queryContext);
const finalizeAggregations = getFinalizeAggregations(queryContext);
const waitTillSegmentsLoad = getWaitTillSegmentsLoad(queryContext);
const groupByEnableMultiValueUnnesting = getGroupByEnableMultiValueUnnesting(queryContext);
const sqlJoinAlgorithm = queryContext.sqlJoinAlgorithm ?? 'broadcast';
const selectDestination = queryContext.selectDestination ?? 'taskReport';
Expand Down Expand Up @@ -311,6 +314,15 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
changeQueryContext(changeFinalizeAggregations(queryContext, v))
}
/>
<MenuTristate
icon={IconNames.STOPWATCH}
text="Wait until segments have loaded"
value={waitTillSegmentsLoad}
undefinedEffectiveValue /* ={true} */
onValueChange={v =>
changeQueryContext(changeWaitTillSegmentsLoad(queryContext, v))
}
/>
<MenuTristate
icon={IconNames.FORK}
text="Enable GroupBy multi-value unnesting"
Expand Down

0 comments on commit 632d35a

Please sign in to comment.