From 9b6bb25b1fe07ba4537a45acd390dab39b2a93e2 Mon Sep 17 00:00:00 2001 From: Dario Gieselaar Date: Mon, 3 May 2021 13:52:54 +0200 Subject: [PATCH] Instrument task manager --- packages/kbn-apm-utils/src/index.ts | 14 +- .../actions/server/lib/action_executor.ts | 223 ++++++++++-------- .../server/queries/task_claiming.ts | 61 ++--- .../server/saved_objects/mappings.json | 3 + x-pack/plugins/task_manager/server/task.ts | 3 + .../server/task_running/task_runner.ts | 40 ++-- .../task_manager/server/task_scheduling.ts | 6 +- 7 files changed, 202 insertions(+), 148 deletions(-) diff --git a/packages/kbn-apm-utils/src/index.ts b/packages/kbn-apm-utils/src/index.ts index f2f537138dad07..384b6683199e5b 100644 --- a/packages/kbn-apm-utils/src/index.ts +++ b/packages/kbn-apm-utils/src/index.ts @@ -16,6 +16,8 @@ export interface SpanOptions { labels?: Record; } +type Span = Exclude; + export function parseSpanOptions(optionsOrName: SpanOptions | string) { const options = typeof optionsOrName === 'string' ? { name: optionsOrName } : optionsOrName; @@ -30,7 +32,7 @@ const runInNewContext = any>(cb: T): ReturnType( optionsOrName: SpanOptions | string, - cb: () => Promise + cb: (span?: Span) => Promise ): Promise { const options = parseSpanOptions(optionsOrName); @@ -71,13 +73,17 @@ export async function withSpan( span.addLabels(labels); } - return cb() + return cb(span) .then((res) => { - span.outcome = 'success'; + if (!span.outcome || span.outcome === 'unknown') { + span.outcome = 'success'; + } return res; }) .catch((err) => { - span.outcome = 'failure'; + if (!span.outcome || span.outcome === 'unknown') { + span.outcome = 'failure'; + } throw err; }) .finally(() => { diff --git a/x-pack/plugins/actions/server/lib/action_executor.ts b/x-pack/plugins/actions/server/lib/action_executor.ts index 6deaa4d587904d..b1b2d64f28de10 100644 --- a/x-pack/plugins/actions/server/lib/action_executor.ts +++ b/x-pack/plugins/actions/server/lib/action_executor.ts @@ -7,6 +7,7 @@ import type { PublicMethodsOf } from '@kbn/utility-types'; import { Logger, KibanaRequest } from 'src/core/server'; +import { withSpan } from '@kbn/apm-utils'; import { validateParams, validateConfig, validateSecrets } from './validate_with_schema'; import { ActionTypeExecutorResult, @@ -78,113 +79,135 @@ export class ActionExecutor { ); } - const { - logger, - spaces, - getServices, - encryptedSavedObjectsClient, - actionTypeRegistry, - eventLogger, - preconfiguredActions, - getActionsClientWithRequest, - } = this.actionExecutorContext!; - - const services = getServices(request); - const spaceId = spaces && spaces.getSpaceId(request); - const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {}; - - const { actionTypeId, name, config, secrets } = await getActionInfo( - await getActionsClientWithRequest(request, source), - encryptedSavedObjectsClient, - preconfiguredActions, - actionId, - namespace.namespace - ); + return withSpan( + { + name: `execute_action`, + type: 'actions', + labels: { + actionId, + }, + }, + async (span) => { + const { + logger, + spaces, + getServices, + encryptedSavedObjectsClient, + actionTypeRegistry, + eventLogger, + preconfiguredActions, + getActionsClientWithRequest, + } = this.actionExecutorContext!; - if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) { - actionTypeRegistry.ensureActionTypeEnabled(actionTypeId); - } - const actionType = actionTypeRegistry.get(actionTypeId); - - let validatedParams: Record; - let validatedConfig: Record; - let validatedSecrets: Record; - - try { - validatedParams = validateParams(actionType, params); - validatedConfig = validateConfig(actionType, config); - validatedSecrets = validateSecrets(actionType, secrets); - } catch (err) { - return { status: 'error', actionId, message: err.message, retry: false }; - } + const services = getServices(request); + const spaceId = spaces && spaces.getSpaceId(request); + const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {}; + + const { actionTypeId, name, config, secrets } = await getActionInfo( + await getActionsClientWithRequest(request, source), + encryptedSavedObjectsClient, + preconfiguredActions, + actionId, + namespace.namespace + ); + + if (span) { + span.name = `execute_action ${actionTypeId}`; + span.addLabels({ + actionTypeId, + }); + } + + if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) { + actionTypeRegistry.ensureActionTypeEnabled(actionTypeId); + } + const actionType = actionTypeRegistry.get(actionTypeId); + + let validatedParams: Record; + let validatedConfig: Record; + let validatedSecrets: Record; + + try { + validatedParams = validateParams(actionType, params); + validatedConfig = validateConfig(actionType, config); + validatedSecrets = validateSecrets(actionType, secrets); + } catch (err) { + span?.setOutcome('failure'); + return { status: 'error', actionId, message: err.message, retry: false }; + } - const actionLabel = `${actionTypeId}:${actionId}: ${name}`; - logger.debug(`executing action ${actionLabel}`); - - const event: IEvent = { - event: { action: EVENT_LOG_ACTIONS.execute }, - kibana: { - saved_objects: [ - { - rel: SAVED_OBJECT_REL_PRIMARY, - type: 'action', - id: actionId, - ...namespace, + const actionLabel = `${actionTypeId}:${actionId}: ${name}`; + logger.debug(`executing action ${actionLabel}`); + + const event: IEvent = { + event: { action: EVENT_LOG_ACTIONS.execute }, + kibana: { + saved_objects: [ + { + rel: SAVED_OBJECT_REL_PRIMARY, + type: 'action', + id: actionId, + ...namespace, + }, + ], }, - ], - }, - }; + }; - eventLogger.startTiming(event); - let rawResult: ActionTypeExecutorResult; - try { - rawResult = await actionType.executor({ - actionId, - services, - params: validatedParams, - config: validatedConfig, - secrets: validatedSecrets, - }); - } catch (err) { - rawResult = { - actionId, - status: 'error', - message: 'an error occurred while running the action executor', - serviceMessage: err.message, - retry: false, - }; - } - eventLogger.stopTiming(event); + eventLogger.startTiming(event); + let rawResult: ActionTypeExecutorResult; + try { + rawResult = await actionType.executor({ + actionId, + services, + params: validatedParams, + config: validatedConfig, + secrets: validatedSecrets, + }); + } catch (err) { + rawResult = { + actionId, + status: 'error', + message: 'an error occurred while running the action executor', + serviceMessage: err.message, + retry: false, + }; + } + eventLogger.stopTiming(event); - // allow null-ish return to indicate success - const result = rawResult || { - actionId, - status: 'ok', - }; + // allow null-ish return to indicate success + const result = rawResult || { + actionId, + status: 'ok', + }; - event.event = event.event || {}; - - if (result.status === 'ok') { - event.event.outcome = 'success'; - event.message = `action executed: ${actionLabel}`; - } else if (result.status === 'error') { - event.event.outcome = 'failure'; - event.message = `action execution failure: ${actionLabel}`; - event.error = event.error || {}; - event.error.message = actionErrorToMessage(result); - logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`); - } else { - event.event.outcome = 'failure'; - event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`; - event.error = event.error || {}; - event.error.message = 'action execution returned unexpected result'; - logger.warn( - `action execution failure: ${actionLabel}: returned unexpected result "${result.status}"` - ); - } + event.event = event.event || {}; - eventLogger.logEvent(event); - return result; + if (result.status === 'ok') { + span?.setOutcome('success'); + event.event.outcome = 'success'; + event.message = `action executed: ${actionLabel}`; + } else if (result.status === 'error') { + span?.setOutcome('failure'); + event.event.outcome = 'failure'; + event.message = `action execution failure: ${actionLabel}`; + event.error = event.error || {}; + event.error.message = actionErrorToMessage(result); + logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`); + } else { + span?.setOutcome('failure'); + event.event.outcome = 'failure'; + event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`; + event.error = event.error || {}; + event.error.message = 'action execution returned unexpected result'; + logger.warn( + `action execution failure: ${actionLabel}: returned unexpected result "${result.status}"` + ); + } + + eventLogger.logEvent(event); + return result; + } + ); } } diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index dce78242816583..7f15707a14b308 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -379,34 +379,41 @@ export class TaskClaiming { sort.unshift('_score'); } - const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager'); - const result = await this.taskStore.updateByQuery( - { - query: matchesClauses( - claimTasksById && claimTasksById.length - ? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks)) - : queryForScheduledTasks, - filterDownBy(InactiveTasks) - ), - script: updateFieldsAndMarkAsFailed( - { - ownerId: this.taskStore.taskManagerId, - retryAt: claimOwnershipUntil, - }, - claimTasksById || [], - taskTypesToClaim, - taskTypesToSkip, - pick(this.taskMaxAttempts, taskTypesToClaim) - ), - sort, - }, - { - max_docs: size, - } + const apmTrans = apm.startTransaction( + 'markAvailableTasksAsClaimed', + `taskManager markAvailableTasksAsClaimed` ); - - if (apmTrans) apmTrans.end(); - return result; + try { + const result = await this.taskStore.updateByQuery( + { + query: matchesClauses( + claimTasksById && claimTasksById.length + ? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks)) + : queryForScheduledTasks, + filterDownBy(InactiveTasks) + ), + script: updateFieldsAndMarkAsFailed( + { + ownerId: this.taskStore.taskManagerId, + retryAt: claimOwnershipUntil, + }, + claimTasksById || [], + taskTypesToClaim, + taskTypesToSkip, + pick(this.taskMaxAttempts, taskTypesToClaim) + ), + sort, + }, + { + max_docs: size, + } + ); + apmTrans?.end('success'); + return result; + } catch (err) { + apmTrans?.end('failure'); + throw err; + } } /** diff --git a/x-pack/plugins/task_manager/server/saved_objects/mappings.json b/x-pack/plugins/task_manager/server/saved_objects/mappings.json index 1728c8f1c552bd..d046a9266cce52 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/mappings.json +++ b/x-pack/plugins/task_manager/server/saved_objects/mappings.json @@ -29,6 +29,9 @@ "status": { "type": "keyword" }, + "traceparent": { + "type": "text" + }, "params": { "type": "text" }, diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 4b86943ff8eca2..28aac04f8fa0f5 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -257,6 +257,8 @@ export interface TaskInstance { // eslint-disable-next-line @typescript-eslint/no-explicit-any state: Record; + traceparent?: string; + /** * The id of the user who scheduled this task. */ @@ -364,6 +366,7 @@ export type SerializedConcreteTaskInstance = Omit< > & { state: string; params: string; + traceparent: string; scheduledAt: string; startedAt: string | null; retryAt: string | null; diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 8e061eae460280..cf712cb5af38bb 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -12,6 +12,7 @@ */ import apm from 'elastic-apm-node'; +import { withSpan } from '@kbn/apm-utils'; import { performance } from 'perf_hooks'; import { identity, defaults, flow } from 'lodash'; import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server'; @@ -242,30 +243,40 @@ export class TaskManagerRunner implements TaskRunner { ); } this.logger.debug(`Running task ${this}`); - const modifiedContext = await this.beforeRun({ - taskInstance: this.instance.task, + + const apmTrans = apm.startTransaction(this.taskType, 'taskManager run', { + childOf: this.instance.task.traceparent, }); + const modifiedContext = await withSpan({ name: 'before run', type: 'task manager' }, () => + this.beforeRun({ + taskInstance: this.instance.task, + }) + ); + const stopTaskTimer = startTaskTimer(); - const apmTrans = apm.startTransaction(`taskManager run`, 'taskManager'); - apmTrans?.addLabels({ - taskType: this.taskType, - }); + try { this.task = this.definition.createTaskRunner(modifiedContext); - const result = await this.task.run(); + const result = await withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run()); const validatedResult = this.validateResult(result); + const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () => + this.processResult(validatedResult, stopTaskTimer()) + ); if (apmTrans) apmTrans.end('success'); - return this.processResult(validatedResult, stopTaskTimer()); + return processedResult; } catch (err) { this.logger.error(`Task ${this} failed: ${err}`); // in error scenario, we can not get the RunResult // re-use modifiedContext's state, which is correct as of beforeRun - if (apmTrans) apmTrans.end('error'); - return this.processResult( - asErr({ error: err, state: modifiedContext.taskInstance.state }), - stopTaskTimer() + const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () => + this.processResult( + asErr({ error: err, state: modifiedContext.taskInstance.state }), + stopTaskTimer() + ) ); + if (apmTrans) apmTrans.end('failure'); + return processedResult; } } @@ -285,10 +296,7 @@ export class TaskManagerRunner implements TaskRunner { } performance.mark('markTaskAsRunning_start'); - const apmTrans = apm.startTransaction(`taskManager markTaskAsRunning`, 'taskManager'); - apmTrans?.addLabels({ - taskType: this.taskType, - }); + const apmTrans = apm.startTransaction('taskManager', 'taskManager markTaskAsRunning'); const now = new Date(); try { diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 29e83ec911b795..153c16f5c4bf78 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -10,6 +10,7 @@ import { filter } from 'rxjs/operators'; import { pipe } from 'fp-ts/lib/pipeable'; import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option'; +import agent from 'elastic-apm-node'; import { Logger } from '../../../../src/core/server'; import { asOk, either, map, mapErr, promiseResult } from './lib/result_type'; import { @@ -85,7 +86,10 @@ export class TaskScheduling { ...options, taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger), }); - return await this.store.schedule(modifiedTask); + return await this.store.schedule({ + ...modifiedTask, + traceparent: agent.currentTraceparent ?? '', + }); } /**