From dafce9016c51b093022652fcd2db1ef81fa19d3a Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Wed, 28 Aug 2024 13:22:27 -0400 Subject: [PATCH] [Response Ops][Task Manager] Emitting error metric when task update fails (#191307) Resolves https://github.com/elastic/kibana/issues/184173 ## Summary Catches errors updating the task from the `taskStore.bulkUpdate` function and emitting an error count so these errors are reflected in the metrics. ## To Verify 1. Add the following to force an error when running an example rule: ``` --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -24,6 +24,7 @@ import { ISavedObjectsRepository, SavedObjectsUpdateResponse, ElasticsearchClient, + SavedObjectsErrorHelpers, } from '@kbn/core/server'; import { RequestTimeoutsConfig } from './config'; @@ -309,6 +310,16 @@ export class TaskStore { this.logger.warn(`Skipping validation for bulk update because excludeLargeFields=true.`); } + const isProcessResult = docs.some( + (doc) => + doc.taskType === 'alerting:example.always-firing' && + doc.status === 'idle' && + doc.retryAt === null + ); + if (isProcessResult) { + throw SavedObjectsErrorHelpers.decorateEsUnavailableError(new Error('test')); + } + const attributesByDocId = docs.reduce((attrsById, doc) => { ``` 2. Create an `example.always-firing` rule and let it run. You should see an error in the logs: ``` [2024-08-26T14:44:07.065-04:00][ERROR][plugins.taskManager] Task alerting:example.always-firing "80b8481d-7bfc-4d38-a31b-7a559fbe846b" failed: Error: test ``` 3. Navigate to https://localhost:5601/api/task_manager/metrics?reset=false and you should see a framework error underneath the overall metrics and the alerting metrics. Co-authored-by: Elastic Machine --- .../server/task_running/task_runner.test.ts | 91 +++++++++++++++++++ .../server/task_running/task_runner.ts | 64 ++++++++----- 2 files changed, 130 insertions(+), 25 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 9274e0583547e1..899586abcc9f7e 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -2061,6 +2061,97 @@ describe('TaskManagerRunner', () => { ); expect(onTaskEvent).toHaveBeenCalledTimes(2); }); + + test('emits TaskEvent when failing to update a recurring task', async () => { + const id = _.random(1, 20).toString(); + const runAt = minutesFromNow(_.random(5)); + const onTaskEvent = jest.fn(); + const { runner, instance, store } = await readyToRunStageSetup({ + onTaskEvent, + instance: { + id, + schedule: { interval: '1m' }, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { runAt, state: {} }; + }, + }), + }, + }, + }); + + const error = new Error('fail'); + + store.update.mockImplementation(() => { + throw error; + }); + + await expect(runner.run()).rejects.toThrowError('fail'); + + expect(onTaskEvent).toHaveBeenCalledWith( + withAnyTiming( + asTaskRunEvent( + id, + asErr({ + task: instance, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Failed, + isExpired: false, + error, + }) + ) + ) + ); + }); + + test('emits TaskEvent when failing to update a non-recurring task', async () => { + const id = _.random(1, 20).toString(); + const runAt = minutesFromNow(_.random(5)); + const onTaskEvent = jest.fn(); + const { runner, instance, store } = await readyToRunStageSetup({ + onTaskEvent, + instance: { + id, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { runAt, state: {} }; + }, + }), + }, + }, + }); + + const error = new Error('fail'); + + store.update.mockImplementation(() => { + throw error; + }); + + await expect(runner.run()).rejects.toThrowError('fail'); + + expect(onTaskEvent).toHaveBeenCalledWith( + withAnyTiming( + asTaskRunEvent( + id, + asErr({ + task: instance, + persistence: TaskPersistence.NonRecurring, + result: TaskRunResult.Failed, + isExpired: false, + error, + }) + ) + ) + ); + }); }); test('does not update saved object if task expires', async () => { diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 6eaaf2cd2881f5..bfcabed9f6e45a 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -719,40 +719,54 @@ export class TaskManagerRunner implements TaskRunner { await eitherAsync( result, async ({ runAt, schedule, taskRunError }: SuccessfulRunResult) => { - const processedResult = { - task, - persistence: - schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring, - result: await (runAt || schedule || task.schedule - ? this.processResultForRecurringTask(result) - : this.processResultWhenDone()), - }; - - // Alerting task runner returns SuccessfulRunResult with taskRunError - // when the alerting task fails, so we check for this condition in order - // to emit the correct task run event for metrics collection - // taskRunError contains the "source" (TaskErrorSource) data - if (!!taskRunError) { - debugLogger.debug(`Emitting task run failed event for task ${this.taskType}`); + const taskPersistence = + schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring; + try { + const processedResult = { + task, + persistence: taskPersistence, + result: await (runAt || schedule || task.schedule + ? this.processResultForRecurringTask(result) + : this.processResultWhenDone()), + }; + + // Alerting task runner returns SuccessfulRunResult with taskRunError + // when the alerting task fails, so we check for this condition in order + // to emit the correct task run event for metrics collection + // taskRunError contains the "source" (TaskErrorSource) data + if (!!taskRunError) { + debugLogger.debug(`Emitting task run failed event for task ${this.taskType}`); + this.onTaskEvent( + asTaskRunEvent( + this.id, + asErr({ ...processedResult, isExpired: taskHasExpired, error: taskRunError }), + taskTiming + ) + ); + } else { + this.onTaskEvent( + asTaskRunEvent( + this.id, + asOk({ ...processedResult, isExpired: taskHasExpired }), + taskTiming + ) + ); + } + } catch (err) { this.onTaskEvent( asTaskRunEvent( this.id, asErr({ - ...processedResult, + task, + persistence: taskPersistence, + result: TaskRunResult.Failed, isExpired: taskHasExpired, - error: taskRunError, + error: err, }), taskTiming ) ); - } else { - this.onTaskEvent( - asTaskRunEvent( - this.id, - asOk({ ...processedResult, isExpired: taskHasExpired }), - taskTiming - ) - ); + throw err; } }, async ({ error }: FailedRunResult) => {