From c796f3f6c2cb9f5e4e3cc4eeaeadd1d22f514cbe Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Thu, 9 Jul 2020 10:19:53 -0700 Subject: [PATCH] Remove ESQueue and replace with Task Manager queueing --- x-pack/plugins/reporting/kibana.json | 1 + x-pack/plugins/reporting/server/core.ts | 19 +- .../csv_from_savedobject/execute_job.ts | 14 +- .../reporting/server/lib/create_queue.ts | 74 -- .../server/lib/create_worker.test.ts | 107 -- .../reporting/server/lib/create_worker.ts | 75 -- .../reporting/server/lib/enqueue_job.ts | 38 +- .../lib/esqueue/__tests__/fixtures/job.js | 23 - .../fixtures/legacy_elasticsearch.js | 111 -- .../lib/esqueue/__tests__/fixtures/queue.js | 17 - .../lib/esqueue/__tests__/fixtures/worker.js | 22 - .../lib/esqueue/__tests__/helpers/errors.js | 57 - .../server/lib/esqueue/__tests__/index.js | 158 --- .../server/lib/esqueue/__tests__/worker.js | 1118 ----------------- .../lib/esqueue/constants/default_settings.js | 16 - .../server/lib/esqueue/constants/events.js | 20 - .../server/lib/esqueue/constants/index.js | 15 - .../lib/esqueue/helpers/create_index.js | 114 -- .../server/lib/esqueue/helpers/errors.js | 26 - .../reporting/server/lib/esqueue/index.js | 54 - .../reporting/server/lib/esqueue/worker.js | 463 ------- x-pack/plugins/reporting/server/lib/index.ts | 6 +- .../lib/{esqueue/constants => }/statuses.ts | 0 .../reporting/server/lib/store/index.ts | 2 +- .../reporting/server/lib/store/mapping.ts | 2 +- .../reporting/server/lib/store/report.test.ts | 121 +- .../reporting/server/lib/store/report.ts | 146 ++- .../reporting/server/lib/store/store.test.ts | 319 ++++- .../reporting/server/lib/store/store.ts | 229 +++- x-pack/plugins/reporting/server/lib/task.ts | 289 +++++ .../plugins/reporting/server/plugin.test.ts | 38 +- x-pack/plugins/reporting/server/plugin.ts | 22 +- .../server/routes/generation.test.ts | 29 +- .../reporting/server/routes/generation.ts | 12 +- .../server/routes/lib/get_document_payload.ts | 7 +- .../create_mock_reportingplugin.ts | 26 +- x-pack/plugins/reporting/server/types.ts | 13 +- 37 files changed, 1061 insertions(+), 2742 deletions(-) delete mode 100644 x-pack/plugins/reporting/server/lib/create_queue.ts delete mode 100644 x-pack/plugins/reporting/server/lib/create_worker.test.ts delete mode 100644 x-pack/plugins/reporting/server/lib/create_worker.ts delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/job.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/legacy_elasticsearch.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/queue.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/worker.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/errors.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/index.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/constants/default_settings.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/constants/events.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/constants/index.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/helpers/create_index.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/helpers/errors.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/index.js delete mode 100644 x-pack/plugins/reporting/server/lib/esqueue/worker.js rename x-pack/plugins/reporting/server/lib/{esqueue/constants => }/statuses.ts (100%) create mode 100644 x-pack/plugins/reporting/server/lib/task.ts diff --git a/x-pack/plugins/reporting/kibana.json b/x-pack/plugins/reporting/kibana.json index a5d7f3d20c44c95..970d9753fae7be8 100644 --- a/x-pack/plugins/reporting/kibana.json +++ b/x-pack/plugins/reporting/kibana.json @@ -13,6 +13,7 @@ "management", "licensing", "uiActions", + "taskManager", "embeddable", "share" ], diff --git a/x-pack/plugins/reporting/server/core.ts b/x-pack/plugins/reporting/server/core.ts index 95dc7586ad4a6b0..0677e350c3c386b 100644 --- a/x-pack/plugins/reporting/server/core.ts +++ b/x-pack/plugins/reporting/server/core.ts @@ -20,11 +20,10 @@ import { SecurityPluginSetup } from '../../security/server'; import { ScreenshotsObservableFn } from '../server/types'; import { ReportingConfig } from './'; import { HeadlessChromiumDriverFactory } from './browsers/chromium/driver_factory'; +import { checkLicense, getExportTypesRegistry, ReportingTask } from './lib'; import { screenshotsObservableFactory } from './lib/screenshots'; -import { checkLicense, getExportTypesRegistry } from './lib'; -import { ESQueueInstance } from './lib/create_queue'; -import { EnqueueJobFn } from './lib/enqueue_job'; import { ReportingStore } from './lib/store'; +import { SchedulingFn } from './lib/task'; export interface ReportingInternalSetup { elasticsearch: ElasticsearchServiceSetup; @@ -32,14 +31,14 @@ export interface ReportingInternalSetup { basePath: BasePath['get']; router: IRouter; security?: SecurityPluginSetup; + task: ReportingTask; } export interface ReportingInternalStart { browserDriverFactory: HeadlessChromiumDriverFactory; - enqueueJob: EnqueueJobFn; - esqueue: ESQueueInstance; store: ReportingStore; savedObjects: SavedObjectsServiceStart; + scheduleTask: SchedulingFn; uiSettings: UiSettingsServiceStart; } @@ -115,7 +114,7 @@ export class ReportingCore { /* * Gives async access to the startDeps */ - private async getPluginStartDeps() { + public async getPluginStartDeps() { if (this.pluginStartDeps) { return this.pluginStartDeps; } @@ -127,12 +126,8 @@ export class ReportingCore { return this.exportTypesRegistry; } - public async getEsqueue() { - return (await this.getPluginStartDeps()).esqueue; - } - - public async getEnqueueJob() { - return (await this.getPluginStartDeps()).enqueueJob; + public async getTaskScheduler() { + return (await this.getPluginStartDeps()).scheduleTask; } public async getLicenseInfo() { diff --git a/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts index ffe453f996698db..ec7e0a21f0498aa 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts @@ -10,7 +10,6 @@ import { CONTENT_TYPE_CSV, CSV_FROM_SAVEDOBJECT_JOB_TYPE } from '../../../common import { RunTaskFnFactory, ScheduledTaskParams, TaskRunResult } from '../../types'; import { createGenerateCsv } from '../csv/generate_csv'; import { JobParamsPanelCsv, SearchPanel } from './types'; -import { getFakeRequest } from './lib/get_fake_request'; import { getGenerateCsvParams } from './lib/get_csv_job'; /* @@ -42,21 +41,12 @@ export const runTaskFnFactory: RunTaskFnFactory = function e // jobID is only for "queued" jobs // Use the jobID as a logging tag or "immediate" const { jobParams } = jobPayload; - const jobLogger = logger.clone([jobId === null ? 'immediate' : jobId]); + const jobLogger = logger.clone(['immediate']); const generateCsv = createGenerateCsv(jobLogger); - const { isImmediate, panel, visType } = jobParams as JobParamsPanelCsv & { - panel: SearchPanel; - }; + const { panel, visType } = jobParams as JobParamsPanelCsv & { panel: SearchPanel }; jobLogger.debug(`Execute job generating [${visType}] csv`); - if (isImmediate && req) { - jobLogger.info(`Executing job from Immediate API using request context`); - } else { - jobLogger.info(`Executing job async using encrypted headers`); - req = await getFakeRequest(jobPayload, config.get('encryptionKey')!, jobLogger); - } - const savedObjectsClient = context.core.savedObjects.client; const uiConfig = await reporting.getUiSettingsServiceFactory(savedObjectsClient); diff --git a/x-pack/plugins/reporting/server/lib/create_queue.ts b/x-pack/plugins/reporting/server/lib/create_queue.ts deleted file mode 100644 index 2da3d8bd47ccb31..000000000000000 --- a/x-pack/plugins/reporting/server/lib/create_queue.ts +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { ReportingCore } from '../core'; -import { JobSource, TaskRunResult } from '../types'; -import { createWorkerFactory } from './create_worker'; -// @ts-ignore -import { Esqueue } from './esqueue'; -import { createTaggedLogger } from './esqueue/create_tagged_logger'; -import { LevelLogger } from './level_logger'; -import { ReportingStore } from './store'; - -interface ESQueueWorker { - on: (event: string, handler: any) => void; -} - -export interface ESQueueInstance { - registerWorker: ( - pluginId: string, - workerFn: GenericWorkerFn, - workerOptions: { - kibanaName: string; - kibanaId: string; - interval: number; - intervalErrorMultiplier: number; - } - ) => ESQueueWorker; -} - -// GenericWorkerFn is a generic for ImmediateExecuteFn | ESQueueWorkerExecuteFn, -type GenericWorkerFn = ( - jobSource: JobSource, - ...workerRestArgs: any[] -) => void | Promise; - -export async function createQueueFactory( - reporting: ReportingCore, - store: ReportingStore, - logger: LevelLogger -): Promise { - const config = reporting.getConfig(); - - // esqueue-related - const queueTimeout = config.get('queue', 'timeout'); - const isPollingEnabled = config.get('queue', 'pollEnabled'); - - const elasticsearch = reporting.getElasticsearchService(); - const queueOptions = { - timeout: queueTimeout, - client: elasticsearch.legacy.client, - logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']), - }; - - const queue: ESQueueInstance = new Esqueue(store, queueOptions); - - if (isPollingEnabled) { - // create workers to poll the index for idle jobs waiting to be claimed and executed - const createWorker = createWorkerFactory(reporting, logger); - await createWorker(queue); - } else { - logger.info( - 'xpack.reporting.queue.pollEnabled is set to false. This Kibana instance ' + - 'will not poll for idle jobs to claim and execute. Make sure another ' + - 'Kibana instance with polling enabled is running in this cluster so ' + - 'reporting jobs can complete.', - ['create_queue'] - ); - } - - return queue; -} diff --git a/x-pack/plugins/reporting/server/lib/create_worker.test.ts b/x-pack/plugins/reporting/server/lib/create_worker.test.ts deleted file mode 100644 index 85188c07eeb2074..000000000000000 --- a/x-pack/plugins/reporting/server/lib/create_worker.test.ts +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import * as sinon from 'sinon'; -import { ReportingConfig, ReportingCore } from '../../server'; -import { createMockReportingCore } from '../test_helpers'; -import { createWorkerFactory } from './create_worker'; -// @ts-ignore -import { Esqueue } from './esqueue'; -// @ts-ignore -import { ClientMock } from './esqueue/__tests__/fixtures/legacy_elasticsearch'; -import { ExportTypesRegistry } from './export_types_registry'; - -const configGetStub = sinon.stub(); -configGetStub.withArgs('queue').returns({ - pollInterval: 3300, - pollIntervalErrorMultiplier: 10, -}); -configGetStub.withArgs('server', 'name').returns('test-server-123'); -configGetStub.withArgs('server', 'uuid').returns('g9ymiujthvy6v8yrh7567g6fwzgzftzfr'); - -const executeJobFactoryStub = sinon.stub(); -const getMockLogger = sinon.stub(); - -const getMockExportTypesRegistry = ( - exportTypes: any[] = [{ runTaskFnFactory: executeJobFactoryStub }] -) => - ({ - getAll: () => exportTypes, - } as ExportTypesRegistry); - -describe('Create Worker', () => { - let mockReporting: ReportingCore; - let mockConfig: ReportingConfig; - let queue: Esqueue; - let client: ClientMock; - - beforeEach(async () => { - mockConfig = { get: configGetStub, kbnConfig: { get: configGetStub } }; - mockReporting = await createMockReportingCore(mockConfig); - mockReporting.getExportTypesRegistry = () => getMockExportTypesRegistry(); - // @ts-ignore over-riding config manually - mockReporting.config = mockConfig; - client = new ClientMock(); - queue = new Esqueue('reporting-queue', { client }); - executeJobFactoryStub.reset(); - }); - - test('Creates a single Esqueue worker for Reporting', async () => { - const createWorker = createWorkerFactory(mockReporting, getMockLogger()); - const registerWorkerSpy = sinon.spy(queue, 'registerWorker'); - - await createWorker(queue); - - sinon.assert.callCount(executeJobFactoryStub, 1); - sinon.assert.callCount(registerWorkerSpy, 1); - - const { firstCall } = registerWorkerSpy; - const [workerName, workerFn, workerOpts] = firstCall.args; - - expect(workerName).toBe('reporting'); - expect(workerFn).toMatchInlineSnapshot(`[Function]`); - expect(workerOpts).toMatchInlineSnapshot(` -Object { - "interval": 3300, - "intervalErrorMultiplier": 10, - "kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr", - "kibanaName": "test-server-123", -} -`); - }); - - test('Creates a single Esqueue worker for Reporting, even if there are multiple export types', async () => { - const exportTypesRegistry = getMockExportTypesRegistry([ - { runTaskFnFactory: executeJobFactoryStub }, - { runTaskFnFactory: executeJobFactoryStub }, - { runTaskFnFactory: executeJobFactoryStub }, - { runTaskFnFactory: executeJobFactoryStub }, - { runTaskFnFactory: executeJobFactoryStub }, - ]); - mockReporting.getExportTypesRegistry = () => exportTypesRegistry; - const createWorker = createWorkerFactory(mockReporting, getMockLogger()); - const registerWorkerSpy = sinon.spy(queue, 'registerWorker'); - - await createWorker(queue); - - sinon.assert.callCount(executeJobFactoryStub, 5); - sinon.assert.callCount(registerWorkerSpy, 1); - - const { firstCall } = registerWorkerSpy; - const [workerName, workerFn, workerOpts] = firstCall.args; - - expect(workerName).toBe('reporting'); - expect(workerFn).toMatchInlineSnapshot(`[Function]`); - expect(workerOpts).toMatchInlineSnapshot(` -Object { - "interval": 3300, - "intervalErrorMultiplier": 10, - "kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr", - "kibanaName": "test-server-123", -} -`); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/create_worker.ts b/x-pack/plugins/reporting/server/lib/create_worker.ts deleted file mode 100644 index 837be1f44a09304..000000000000000 --- a/x-pack/plugins/reporting/server/lib/create_worker.ts +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { CancellationToken } from '../../common'; -import { PLUGIN_ID } from '../../common/constants'; -import { ReportingCore } from '../../server'; -import { LevelLogger } from '../../server/lib'; -import { ESQueueWorkerExecuteFn, ExportTypeDefinition, JobSource } from '../../server/types'; -import { ESQueueInstance } from './create_queue'; -// @ts-ignore untyped dependency -import { events as esqueueEvents } from './esqueue'; - -export function createWorkerFactory(reporting: ReportingCore, logger: LevelLogger) { - const config = reporting.getConfig(); - const queueConfig = config.get('queue'); - const kibanaName = config.kbnConfig.get('server', 'name'); - const kibanaId = config.kbnConfig.get('server', 'uuid'); - - // Once more document types are added, this will need to be passed in - return async function createWorker(queue: ESQueueInstance) { - // export type / execute job map - const jobExecutors: Map> = new Map(); - - for (const exportType of reporting.getExportTypesRegistry().getAll() as Array< - ExportTypeDefinition> - >) { - const jobExecutor = exportType.runTaskFnFactory(reporting, logger); - jobExecutors.set(exportType.jobType, jobExecutor); - } - - const workerFn = ( - jobSource: JobSource, - jobParams: ScheduledTaskParamsType, - cancellationToken: CancellationToken - ) => { - const { - _id: jobId, - _source: { jobtype: jobType }, - } = jobSource; - - if (!jobId) { - throw new Error(`Claimed job is missing an ID!: ${JSON.stringify(jobSource)}`); - } - - const jobTypeExecutor = jobExecutors.get(jobType); - if (!jobTypeExecutor) { - throw new Error(`Unable to find a job executor for the claimed job: [${jobId}]`); - } - - // pass the work to the jobExecutor - return jobTypeExecutor(jobId, jobParams, cancellationToken); - }; - - const workerOptions = { - kibanaName, - kibanaId, - interval: queueConfig.pollInterval, - intervalErrorMultiplier: queueConfig.pollIntervalErrorMultiplier, - }; - const worker = queue.registerWorker(PLUGIN_ID, workerFn, workerOptions); - - worker.on(esqueueEvents.EVENT_WORKER_COMPLETE, (res: any) => { - logger.debug(`Worker completed: (${res.job.id})`); - }); - worker.on(esqueueEvents.EVENT_WORKER_JOB_EXECUTION_ERROR, (res: any) => { - logger.debug(`Worker error: (${res.job.id})`); - }); - worker.on(esqueueEvents.EVENT_WORKER_JOB_TIMEOUT, (res: any) => { - logger.debug(`Job timeout exceeded: (${res.job.id})`); - }); - }; -} diff --git a/x-pack/plugins/reporting/server/lib/enqueue_job.ts b/x-pack/plugins/reporting/server/lib/enqueue_job.ts index d1554a03b9389a5..ea73c4a12b55c90 100644 --- a/x-pack/plugins/reporting/server/lib/enqueue_job.ts +++ b/x-pack/plugins/reporting/server/lib/enqueue_job.ts @@ -5,11 +5,11 @@ */ import { KibanaRequest, RequestHandlerContext } from 'src/core/server'; +import { ReportingCore } from '../'; import { AuthenticatedUser } from '../../../security/server'; -import { ESQueueCreateJobFn } from '../../server/types'; -import { ReportingCore } from '../core'; +import { ESQueueCreateJobFn } from '../types'; import { LevelLogger } from './'; -import { ReportingStore, Report } from './store'; +import { Report } from './store'; export type EnqueueJobFn = ( exportTypeId: string, @@ -21,13 +21,8 @@ export type EnqueueJobFn = ( export function enqueueJobFactory( reporting: ReportingCore, - store: ReportingStore, parentLogger: LevelLogger ): EnqueueJobFn { - const config = reporting.getConfig(); - const queueTimeout = config.get('queue', 'timeout'); - const browserType = config.get('capture', 'browser', 'type'); - const maxAttempts = config.get('capture', 'maxAttempts'); const logger = parentLogger.clone(['queue-job']); return async function enqueueJob( @@ -39,23 +34,34 @@ export function enqueueJobFactory( ) { type ScheduleTaskFnType = ESQueueCreateJobFn; - const username = user ? user.username : false; + const username: string | null = user ? user.username : null; const exportType = reporting.getExportTypesRegistry().getById(exportTypeId); if (exportType == null) { throw new Error(`Export type ${exportTypeId} does not exist in the registry!`); } + // get ready to store the report const scheduleTask = exportType.scheduleTaskFnFactory(reporting, logger) as ScheduleTaskFnType; const payload = await scheduleTask(jobParams, context, request); - const options = { - timeout: queueTimeout, - created_by: username, - browser_type: browserType, - max_attempts: maxAttempts, - }; + // store the pending report, puts it in the Reporting Management UI table + const { store } = await reporting.getPluginStartDeps(); + const report = await store.addReport(exportType.jobType, username, payload); - return await store.addReport(exportType.jobType, payload, options); + // get ready to schedule the task + const reportingTaskScheduler = await reporting.getTaskScheduler(); + + // schedule the task + const task = await reportingTaskScheduler({ + report: report.toReportTaskJSON(), + payload, + user: username, + headers: request.headers, + }); + + logger.info(`Scheduled task: ${task.id}`); + + return report; }; } diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/job.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/job.js deleted file mode 100644 index 9cc62800d439f18..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/job.js +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; - -export class JobMock extends events.EventEmitter { - constructor(queue, index, type, payload, options = {}) { - super(); - - this.queue = queue; - this.index = index; - this.jobType = type; - this.payload = payload; - this.options = options; - } - - getProp(name) { - return this[name]; - } -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/legacy_elasticsearch.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/legacy_elasticsearch.js deleted file mode 100644 index ebda7ff955b11cd..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/legacy_elasticsearch.js +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { uniqueId, times, random } from 'lodash'; -import { errors as esErrors } from 'elasticsearch'; - -export function ClientMock() { - this.callAsInternalUser = (endpoint, params = {}, ...rest) => { - if (endpoint === 'indices.create') { - return Promise.resolve({ acknowledged: true }); - } - - if (endpoint === 'indices.exists') { - return Promise.resolve(false); - } - - if (endpoint === 'index') { - const shardCount = 2; - return Promise.resolve({ - _index: params.index || 'index', - _id: params.id || uniqueId('testDoc'), - _seq_no: 1, - _primary_term: 1, - _shards: { total: shardCount, successful: shardCount, failed: 0 }, - created: true, - }); - } - - if (endpoint === 'get') { - if (params === esErrors.NotFound) return esErrors.NotFound; - - const _source = { - jobtype: 'jobtype', - created_by: false, - - payload: { - id: 'sample-job-1', - now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)', - }, - - priority: 10, - timeout: 10000, - created_at: '2016-04-25T21:13:04.738Z', - attempts: 0, - max_attempts: 3, - status: 'pending', - ...(rest[0] || {}), - }; - - return Promise.resolve({ - _index: params.index || 'index', - _id: params.id || 'AVRPRLnlp7Ur1SZXfT-T', - _seq_no: params._seq_no || 1, - _primary_term: params._primary_term || 1, - found: true, - _source: _source, - }); - } - - if (endpoint === 'search') { - const [count = 5, source = {}] = rest; - const hits = times(count, () => { - return { - _index: params.index || 'index', - _id: uniqueId('documentId'), - _seq_no: random(1, 5), - _primar_term: random(1, 5), - _score: null, - _source: { - created_at: new Date().toString(), - number: random(0, count, true), - ...source, - }, - }; - }); - return Promise.resolve({ - took: random(0, 10), - timed_out: false, - _shards: { - total: 5, - successful: 5, - failed: 0, - }, - hits: { - total: count, - max_score: null, - hits: hits, - }, - }); - } - - if (endpoint === 'update') { - const shardCount = 2; - return Promise.resolve({ - _index: params.index || 'index', - _id: params.id || uniqueId('testDoc'), - _seq_no: params.if_seq_no + 1 || 2, - _primary_term: params.if_primary_term + 1 || 2, - _shards: { total: shardCount, successful: shardCount, failed: 0 }, - created: true, - }); - } - - return Promise.resolve(); - }; - - this.transport = {}; -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/queue.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/queue.js deleted file mode 100644 index 974cb4a5e2a6e1c..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/queue.js +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; - -export class QueueMock extends events.EventEmitter { - constructor() { - super(); - } - - setClient(client) { - this.client = client; - } -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/worker.js deleted file mode 100644 index fe8a859ccb44552..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/fixtures/worker.js +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; - -export class WorkerMock extends events.EventEmitter { - constructor(queue, type, workerFn, opts = {}) { - super(); - - this.queue = queue; - this.type = type; - this.workerFn = workerFn; - this.options = opts; - } - - getProp(name) { - return this[name]; - } -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/errors.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/errors.js deleted file mode 100644 index d41b29106bb9dd4..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/errors.js +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import expect from '@kbn/expect'; -import { WorkerTimeoutError, UnspecifiedWorkerError } from '../../helpers/errors'; - -describe('custom errors', function () { - describe('WorkerTimeoutError', function () { - it('should be function', () => { - expect(WorkerTimeoutError).to.be.a('function'); - }); - - it('should have a name', function () { - const err = new WorkerTimeoutError('timeout error'); - expect(err).to.have.property('name', 'WorkerTimeoutError'); - }); - - it('should take a jobId property', function () { - const err = new WorkerTimeoutError('timeout error', { jobId: 'il7hl34rqlo8ro' }); - expect(err).to.have.property('jobId', 'il7hl34rqlo8ro'); - }); - - it('should take a timeout property', function () { - const err = new WorkerTimeoutError('timeout error', { timeout: 15000 }); - expect(err).to.have.property('timeout', 15000); - }); - - it('should be stringifyable', function () { - const err = new WorkerTimeoutError('timeout error'); - expect(`${err}`).to.equal('WorkerTimeoutError: timeout error'); - }); - }); - - describe('UnspecifiedWorkerError', function () { - it('should be function', () => { - expect(UnspecifiedWorkerError).to.be.a('function'); - }); - - it('should have a name', function () { - const err = new UnspecifiedWorkerError('unspecified error'); - expect(err).to.have.property('name', 'UnspecifiedWorkerError'); - }); - - it('should take a jobId property', function () { - const err = new UnspecifiedWorkerError('unspecified error', { jobId: 'il7hl34rqlo8ro' }); - expect(err).to.have.property('jobId', 'il7hl34rqlo8ro'); - }); - - it('should be stringifyable', function () { - const err = new UnspecifiedWorkerError('unspecified error'); - expect(`${err}`).to.equal('UnspecifiedWorkerError: unspecified error'); - }); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/index.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/index.js deleted file mode 100644 index 7cdae152ad0d793..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/index.js +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; -import expect from '@kbn/expect'; -import sinon from 'sinon'; -import proxyquire from 'proxyquire'; -import { noop, times } from 'lodash'; -import { constants } from '../constants'; -import { ClientMock } from './fixtures/legacy_elasticsearch'; -import { JobMock } from './fixtures/job'; -import { WorkerMock } from './fixtures/worker'; - -const { Esqueue } = proxyquire.noPreserveCache()('../index', { - './job': { Job: JobMock }, - './worker': { Worker: WorkerMock }, -}); - -describe('Esqueue class', function () { - let client; - - beforeEach(function () { - client = new ClientMock(); - }); - - it('should be an event emitter', function () { - const queue = new Esqueue('esqueue', { client }); - expect(queue).to.be.an(events.EventEmitter); - }); - - describe('Option validation', function () { - it('should throw without an index', function () { - const init = () => new Esqueue(); - expect(init).to.throwException(/must.+specify.+index/i); - }); - }); - - describe('Queue construction', function () { - it('should ping the ES server', function () { - const pingSpy = sinon.spy(client, 'callAsInternalUser').withArgs('ping'); - new Esqueue('esqueue', { client }); - sinon.assert.calledOnce(pingSpy); - }); - }); - - describe('Adding jobs', function () { - let indexName; - let jobType; - let payload; - let queue; - - beforeEach(function () { - indexName = 'esqueue-index'; - jobType = 'test-test'; - payload = { payload: true }; - queue = new Esqueue(indexName, { client }); - }); - - it('should throw with invalid dateSeparator setting', function () { - queue = new Esqueue(indexName, { client, dateSeparator: 'a' }); - const fn = () => queue.addJob(jobType, payload); - expect(fn).to.throwException(); - }); - - it('should pass queue instance, index name, type and payload', function () { - const job = queue.addJob(jobType, payload); - expect(job.getProp('queue')).to.equal(queue); - expect(job.getProp('index')).to.match(new RegExp(indexName)); - expect(job.getProp('jobType')).to.equal(jobType); - expect(job.getProp('payload')).to.equal(payload); - }); - - it('should pass default settings', function () { - const job = queue.addJob(jobType, payload); - const options = job.getProp('options'); - expect(options).to.have.property('timeout', constants.DEFAULT_SETTING_TIMEOUT); - }); - - it('should pass queue index settings', function () { - const indexSettings = { - index: { - number_of_shards: 1, - }, - }; - - queue = new Esqueue(indexName, { client, indexSettings }); - const job = queue.addJob(jobType, payload); - expect(job.getProp('options')).to.have.property('indexSettings', indexSettings); - }); - - it('should pass headers from options', function () { - const options = { - headers: { - authorization: 'Basic cXdlcnR5', - }, - }; - const job = queue.addJob(jobType, payload, options); - expect(job.getProp('options')).to.have.property('headers', options.headers); - }); - }); - - describe('Registering workers', function () { - let queue; - - beforeEach(function () { - queue = new Esqueue('esqueue', { client }); - }); - - it('should keep track of workers', function () { - expect(queue.getWorkers()).to.eql([]); - expect(queue.getWorkers()).to.have.length(0); - - queue.registerWorker('test', noop); - queue.registerWorker('test', noop); - queue.registerWorker('test2', noop); - expect(queue.getWorkers()).to.have.length(3); - }); - - it('should pass instance of queue, type, and worker function', function () { - const workerType = 'test-worker'; - const workerFn = () => true; - - const worker = queue.registerWorker(workerType, workerFn); - expect(worker.getProp('queue')).to.equal(queue); - expect(worker.getProp('type')).to.equal(workerType); - expect(worker.getProp('workerFn')).to.equal(workerFn); - }); - - it('should pass worker options', function () { - const workerOptions = { - size: 12, - }; - - queue = new Esqueue('esqueue', { client }); - const worker = queue.registerWorker('type', noop, workerOptions); - const options = worker.getProp('options'); - expect(options.size).to.equal(workerOptions.size); - }); - }); - - describe('Destroy', function () { - it('should destroy workers', function () { - const queue = new Esqueue('esqueue', { client }); - const stubs = times(3, () => { - return { destroy: sinon.stub() }; - }); - stubs.forEach((stub) => queue._workers.push(stub)); - expect(queue.getWorkers()).to.have.length(3); - - queue.destroy(); - stubs.forEach((stub) => sinon.assert.calledOnce(stub.destroy)); - expect(queue.getWorkers()).to.have.length(0); - }); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js deleted file mode 100644 index b31a39a6f90ccc4..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js +++ /dev/null @@ -1,1118 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import expect from '@kbn/expect'; -import sinon from 'sinon'; -import moment from 'moment'; -import { noop, random, get, find, identity } from 'lodash'; -import { ClientMock } from './fixtures/legacy_elasticsearch'; -import { QueueMock } from './fixtures/queue'; -import { formatJobObject, getUpdatedDocPath, Worker } from '../worker'; -import { constants } from '../constants'; - -const anchor = '2016-04-02T01:02:03.456'; // saturday -const defaults = { - timeout: 10000, - size: 1, - unknownMime: false, - contentBody: null, -}; - -const defaultWorkerOptions = { - interval: 3000, - intervalErrorMultiplier: 10, -}; - -describe('Worker class', function () { - // some of these tests might be a little slow, give them a little extra time - this.timeout(10000); - - let anchorMoment; - let clock; - let client; - let mockQueue; - let worker; - let worker2; - - // Allowing the Poller to poll requires intimate knowledge of the inner workings of the Poller. - // We have to ensure that the Promises internal to the `_poll` method are resolved to queue up - // the next setTimeout before incrementing the clock. - const allowPoll = async (interval) => { - clock.tick(interval); - await Promise.resolve(); - await Promise.resolve(); - }; - - beforeEach(function () { - client = new ClientMock(); - mockQueue = new QueueMock(); - mockQueue.setClient(client); - }); - - afterEach(function () { - [worker, worker2].forEach((actualWorker) => { - if (actualWorker) { - actualWorker.destroy(); - } - }); - }); - - describe('invalid construction', function () { - it('should throw without a type', function () { - const init = () => new Worker(mockQueue); - expect(init).to.throwException(/type.+string/i); - }); - - it('should throw without an invalid type', function () { - const init = () => new Worker(mockQueue, { string: false }); - expect(init).to.throwException(/type.+string/i); - }); - - it('should throw without a workerFn', function () { - const init = () => new Worker(mockQueue, 'test'); - expect(init).to.throwException(/workerFn.+function/i); - }); - - it('should throw with an invalid workerFn', function () { - const init = () => new Worker(mockQueue, 'test', { function: false }); - expect(init).to.throwException(/workerFn.+function/i); - }); - - it('should throw without an opts', function () { - const init = () => new Worker(mockQueue, 'test', noop); - expect(init).to.throwException(/opts.+object/i); - }); - - it('should throw with an invalid opts.interval', function () { - const init = () => new Worker(mockQueue, 'test', noop, {}); - expect(init).to.throwException(/opts\.interval.+number/i); - }); - - it('should throw with an invalid opts.intervalErrorMultiplier', function () { - const init = () => new Worker(mockQueue, 'test', noop, { interval: 1 }); - expect(init).to.throwException(/opts\.intervalErrorMultiplier.+number/i); - }); - }); - - describe('construction', function () { - it('should assign internal properties', function () { - const jobtype = 'testjob'; - const workerFn = noop; - worker = new Worker(mockQueue, jobtype, workerFn, defaultWorkerOptions); - expect(worker).to.have.property('id'); - expect(worker).to.have.property('queue', mockQueue); - expect(worker).to.have.property('jobtype', jobtype); - expect(worker).to.have.property('workerFn', workerFn); - }); - - it('should have a unique ID', function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - expect(worker.id).to.be.a('string'); - - worker2 = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - expect(worker2.id).to.be.a('string'); - - expect(worker.id).to.not.equal(worker2.id); - }); - }); - - describe('event emitting', function () { - beforeEach(function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - }); - - it('should trigger events on the queue instance', function (done) { - const eventName = 'test event'; - const payload1 = { - test: true, - deep: { object: 'ok' }, - }; - const payload2 = 'two'; - const payload3 = new Error('test error'); - - mockQueue.on(eventName, (...args) => { - try { - expect(args[0]).to.equal(payload1); - expect(args[1]).to.equal(payload2); - expect(args[2]).to.equal(payload3); - done(); - } catch (e) { - done(e); - } - }); - - worker.emit(eventName, payload1, payload2, payload3); - }); - }); - - describe('output formatting', function () { - let f; - - beforeEach(function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - f = (output) => worker._formatOutput(output); - }); - - it('should handle primitives', function () { - const primitives = ['test', true, 1234, { one: 1 }, [5, 6, 7, 8]]; - - primitives.forEach((val) => { - expect(f(val)).to.have.property('content_type', defaults.unknownMime); - expect(f(val)).to.have.property('content', val); - }); - }); - - it('should accept content object without type', function () { - const output = { - content: 'test output', - }; - - expect(f(output)).to.have.property('content_type', defaults.unknownMime); - expect(f(output)).to.have.property('content', output.content); - }); - - it('should accept a content type', function () { - const output = { - content_type: 'test type', - content: 'test output', - }; - - expect(f(output)).to.have.property('content_type', output.content_type); - expect(f(output)).to.have.property('content', output.content); - }); - - it('should work with no input', function () { - expect(f()).to.have.property('content_type', defaults.unknownMime); - expect(f()).to.have.property('content', defaults.contentBody); - }); - }); - - describe('polling for jobs', function () { - beforeEach(() => { - anchorMoment = moment(anchor); - clock = sinon.useFakeTimers(anchorMoment.valueOf()); - }); - - afterEach(() => { - clock.restore(); - }); - - it('should start polling for jobs after interval', async function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - const processPendingJobsStub = sinon - .stub(worker, '_processPendingJobs') - .callsFake(() => Promise.resolve()); - sinon.assert.notCalled(processPendingJobsStub); - await allowPoll(defaultWorkerOptions.interval); - sinon.assert.calledOnce(processPendingJobsStub); - }); - - it('should use interval option to control polling', async function () { - const interval = 567; - worker = new Worker(mockQueue, 'test', noop, { ...defaultWorkerOptions, interval }); - const processPendingJobsStub = sinon - .stub(worker, '_processPendingJobs') - .callsFake(() => Promise.resolve()); - - sinon.assert.notCalled(processPendingJobsStub); - await allowPoll(interval); - sinon.assert.calledOnce(processPendingJobsStub); - }); - - it('should not poll once destroyed', async function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - - const processPendingJobsStub = sinon - .stub(worker, '_processPendingJobs') - .callsFake(() => Promise.resolve()); - - // move the clock a couple times, test for searches each time - sinon.assert.notCalled(processPendingJobsStub); - await allowPoll(defaultWorkerOptions.interval); - sinon.assert.calledOnce(processPendingJobsStub); - await allowPoll(defaultWorkerOptions.interval); - sinon.assert.calledTwice(processPendingJobsStub); - - // destroy the worker, move the clock, make sure another search doesn't happen - worker.destroy(); - await allowPoll(defaultWorkerOptions.interval); - sinon.assert.calledTwice(processPendingJobsStub); - - // manually call job poller, move the clock, make sure another search doesn't happen - worker._startJobPolling(); - await allowPoll(defaultWorkerOptions.interval); - sinon.assert.calledTwice(processPendingJobsStub); - }); - - it('should use error multiplier when processPendingJobs rejects the Promise', async function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - - const processPendingJobsStub = sinon - .stub(worker, '_processPendingJobs') - .rejects(new Error('test error')); - - await allowPoll(defaultWorkerOptions.interval); - expect(processPendingJobsStub.callCount).to.be(1); - await allowPoll(defaultWorkerOptions.interval); - expect(processPendingJobsStub.callCount).to.be(1); - await allowPoll(defaultWorkerOptions.interval * defaultWorkerOptions.intervalErrorMultiplier); - expect(processPendingJobsStub.callCount).to.be(2); - }); - - it('should not use error multiplier when processPendingJobs resolved the Promise', async function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - - const processPendingJobsStub = sinon - .stub(worker, '_processPendingJobs') - .callsFake(() => Promise.resolve()); - - await allowPoll(defaultWorkerOptions.interval); - expect(processPendingJobsStub.callCount).to.be(1); - await allowPoll(defaultWorkerOptions.interval); - expect(processPendingJobsStub.callCount).to.be(2); - }); - }); - - describe('query for pending jobs', function () { - let searchStub; - - function getSearchParams(jobtype = 'test', params = {}) { - worker = new Worker(mockQueue, jobtype, noop, { ...defaultWorkerOptions, ...params }); - worker._getPendingJobs(); - return searchStub.firstCall.args[1]; - } - - describe('error handling', function () { - it('should pass search errors', function (done) { - searchStub = sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('search') - .callsFake(() => Promise.reject()); - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - worker - ._getPendingJobs() - .then(() => done(new Error('should not resolve'))) - .catch(() => { - done(); - }); - }); - - describe('missing index', function () { - it('should swallow error', function (done) { - searchStub = sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('search') - .callsFake(() => Promise.reject({ status: 404 })); - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - worker - ._getPendingJobs() - .then(() => { - done(); - }) - .catch(() => done(new Error('should not reject'))); - }); - - it('should return an empty array', function (done) { - searchStub = sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('search') - .callsFake(() => Promise.reject({ status: 404 })); - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - worker - ._getPendingJobs() - .then((res) => { - try { - expect(res).to.be.an(Array); - expect(res).to.have.length(0); - done(); - } catch (e) { - done(e); - } - }) - .catch(() => done(new Error('should not reject'))); - }); - }); - }); - - describe('query body', function () { - const conditionPath = 'query.bool.filter.bool'; - const jobtype = 'test_jobtype'; - - beforeEach(() => { - searchStub = sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('search') - .callsFake(() => Promise.resolve({ hits: { hits: [] } })); - anchorMoment = moment(anchor); - clock = sinon.useFakeTimers(anchorMoment.valueOf()); - }); - - afterEach(() => { - clock.restore(); - }); - - it('should query with seq_no_primary_term', function () { - const { body } = getSearchParams(jobtype); - expect(body).to.have.property('seq_no_primary_term', true); - }); - - it('should filter unwanted source data', function () { - const excludedFields = ['output.content']; - const { body } = getSearchParams(jobtype); - expect(body).to.have.property('_source'); - expect(body._source).to.eql({ excludes: excludedFields }); - }); - - it('should search for pending or expired jobs', function () { - const { body } = getSearchParams(jobtype); - const conditions = get(body, conditionPath); - expect(conditions).to.have.property('should'); - - // this works because we are stopping the clock, so all times match - const nowTime = moment().toISOString(); - const pending = { term: { status: 'pending' } }; - const expired = { - bool: { - must: [ - { term: { status: 'processing' } }, - { range: { process_expiration: { lte: nowTime } } }, - ], - }, - }; - - const pendingMatch = find(conditions.should, pending); - expect(pendingMatch).to.not.be(undefined); - - const expiredMatch = find(conditions.should, expired); - expect(expiredMatch).to.not.be(undefined); - }); - - it('specify that there should be at least one match', function () { - const { body } = getSearchParams(jobtype); - const conditions = get(body, conditionPath); - expect(conditions).to.have.property('minimum_should_match', 1); - }); - - it('should use default size', function () { - const { body } = getSearchParams(jobtype); - expect(body).to.have.property('size', defaults.size); - }); - }); - }); - - describe('claiming a job', function () { - let params; - let job; - let updateSpy; - - beforeEach(function () { - anchorMoment = moment(anchor); - clock = sinon.useFakeTimers(anchorMoment.valueOf()); - - params = { - index: 'myIndex', - type: 'test', - id: 12345, - }; - return mockQueue.client.callAsInternalUser('get', params).then((jobDoc) => { - job = jobDoc; - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - updateSpy = sinon.spy(mockQueue.client, 'callAsInternalUser').withArgs('update'); - }); - }); - - afterEach(() => { - clock.restore(); - }); - - it('should use seqNo and primaryTerm on update', function () { - worker._claimJob(job); - const query = updateSpy.firstCall.args[1]; - expect(query).to.have.property('index', job._index); - expect(query).to.have.property('id', job._id); - expect(query).to.have.property('if_seq_no', job._seq_no); - expect(query).to.have.property('if_primary_term', job._primary_term); - }); - - it('should increment the job attempts', function () { - worker._claimJob(job); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('attempts', job._source.attempts + 1); - }); - - it('should update the job status', function () { - worker._claimJob(job); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('status', constants.JOB_STATUS_PROCESSING); - }); - - it('should set job expiration time', function () { - worker._claimJob(job); - const doc = updateSpy.firstCall.args[1].body.doc; - const expiration = anchorMoment.add(defaults.timeout).toISOString(); - expect(doc).to.have.property('process_expiration', expiration); - }); - - it('should fail job if max_attempts are hit', function () { - const failSpy = sinon.spy(worker, '_failJob'); - job._source.attempts = job._source.max_attempts; - worker._claimJob(job); - sinon.assert.calledOnce(failSpy); - }); - - it('should append error message if no existing content', function () { - const failSpy = sinon.spy(worker, '_failJob'); - job._source.attempts = job._source.max_attempts; - expect(job._source.output).to.be(undefined); - worker._claimJob(job); - const msg = failSpy.firstCall.args[1]; - expect(msg).to.contain('Max attempts reached'); - expect(msg).to.contain(job._source.max_attempts); - }); - - it('should not append message if existing output', function () { - const failSpy = sinon.spy(worker, '_failJob'); - job._source.attempts = job._source.max_attempts; - job._source.output = 'i have some output'; - worker._claimJob(job); - const msg = failSpy.firstCall.args[1]; - expect(msg).to.equal(false); - }); - - it('should reject the promise on conflict errors', function () { - mockQueue.client.callAsInternalUser.restore(); - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .returns(Promise.reject({ statusCode: 409 })); - return worker._claimJob(job).catch((err) => { - expect(err).to.eql({ statusCode: 409 }); - }); - }); - - it('should reject the promise on other errors', function () { - mockQueue.client.callAsInternalUser.restore(); - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .returns(Promise.reject({ statusCode: 401 })); - return worker._claimJob(job).catch((err) => { - expect(err).to.eql({ statusCode: 401 }); - }); - }); - }); - - describe('find a pending job to claim', function () { - const getMockJobs = (status = 'pending') => [ - { - _index: 'myIndex', - _id: 12345, - _seq_no: 3, - _primary_term: 3, - found: true, - _source: { - jobtype: 'jobtype', - created_by: false, - payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' }, - priority: 10, - timeout: 10000, - created_at: '2016-04-25T21:13:04.738Z', - attempts: 0, - max_attempts: 3, - status, - }, - }, - ]; - - beforeEach(function () { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - }); - - afterEach(() => { - mockQueue.client.callAsInternalUser.restore(); - }); - - it('should emit for errors from claiming job', function (done) { - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .rejects({ statusCode: 401 }); - - worker.once(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { - try { - expect(err).to.have.property('error'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - expect(err.error).to.have.property('statusCode', 401); - done(); - } catch (e) { - done(e); - } - }); - - worker._claimPendingJobs(getMockJobs()).catch(() => {}); - }); - - it('should reject the promise if an error claiming the job', function () { - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .rejects({ statusCode: 409 }); - return worker._claimPendingJobs(getMockJobs()).catch((err) => { - expect(err).to.eql({ statusCode: 409 }); - }); - }); - - it('should get the pending job', function () { - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .resolves({ test: 'cool' }); - sinon.stub(worker, '_performJob').callsFake(identity); - return worker._claimPendingJobs(getMockJobs()).then((claimedJob) => { - expect(claimedJob._index).to.be('myIndex'); - expect(claimedJob._source.jobtype).to.be('jobtype'); - expect(claimedJob._source.status).to.be('processing'); - expect(claimedJob.test).to.be('cool'); - worker._performJob.restore(); - }); - }); - }); - - describe('failing a job', function () { - let job; - let updateSpy; - - beforeEach(function () { - anchorMoment = moment(anchor); - clock = sinon.useFakeTimers(anchorMoment.valueOf()); - - return mockQueue.client.callAsInternalUser('get').then((jobDoc) => { - job = jobDoc; - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - updateSpy = sinon.spy(mockQueue.client, 'callAsInternalUser').withArgs('update'); - }); - }); - - afterEach(() => { - clock.restore(); - }); - - it('should use _seq_no and _primary_term on update', function () { - worker._failJob(job); - const query = updateSpy.firstCall.args[1]; - expect(query).to.have.property('index', job._index); - expect(query).to.have.property('id', job._id); - expect(query).to.have.property('if_seq_no', job._seq_no); - expect(query).to.have.property('if_primary_term', job._primary_term); - }); - - it('should set status to failed', function () { - worker._failJob(job); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED); - }); - - it('should append error message if supplied', function () { - const msg = 'test message'; - worker._failJob(job, msg); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('output'); - expect(doc.output).to.have.property('content', msg); - }); - - it('should return true on conflict errors', function () { - mockQueue.client.callAsInternalUser.restore(); - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .rejects({ statusCode: 409 }); - return worker._failJob(job).then((res) => expect(res).to.equal(true)); - }); - - it('should return false on other document update errors', function () { - mockQueue.client.callAsInternalUser.restore(); - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .rejects({ statusCode: 401 }); - return worker._failJob(job).then((res) => expect(res).to.equal(false)); - }); - - it('should set completed time and status to failure', function () { - const startTime = moment().valueOf(); - const msg = 'test message'; - clock.tick(100); - - worker._failJob(job, msg); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('output'); - expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED); - expect(doc).to.have.property('completed_at'); - const completedTimestamp = moment(doc.completed_at).valueOf(); - expect(completedTimestamp).to.be.greaterThan(startTime); - }); - - it('should emit worker failure event', function (done) { - worker.on(constants.EVENT_WORKER_JOB_FAIL, (err) => { - try { - expect(err).to.have.property('output'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - done(); - } catch (e) { - done(e); - } - }); - - return worker._failJob(job); - }); - - it('should emit on other document update errors', function (done) { - mockQueue.client.callAsInternalUser.restore(); - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .rejects({ statusCode: 401 }); - - worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) { - try { - expect(err).to.have.property('error'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - expect(err.error).to.have.property('statusCode', 401); - done(); - } catch (e) { - done(e); - } - }); - worker._failJob(job); - }); - }); - - describe('performing a job', function () { - let job; - let payload; - let updateSpy; - - beforeEach(function () { - payload = { - value: random(0, 100, true), - }; - - return mockQueue.client.callAsInternalUser('get', {}, { payload }).then((jobDoc) => { - job = jobDoc; - updateSpy = sinon.spy(mockQueue.client, 'callAsInternalUser').withArgs('update'); - }); - }); - - describe('worker success', function () { - it('should call the workerFn with the payload', function (done) { - const workerFn = function (jobPayload) { - expect(jobPayload).to.eql(payload); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - - worker._performJob(job).then(() => done()); - }); - - it('should update the job with the workerFn output', function () { - const workerFn = function (job, jobPayload) { - expect(jobPayload).to.eql(payload); - return payload; - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(updateSpy); - const query = updateSpy.firstCall.args[1]; - - expect(query).to.have.property('index', job._index); - expect(query).to.have.property('id', job._id); - expect(query).to.have.property('if_seq_no', job._seq_no); - expect(query).to.have.property('if_primary_term', job._primary_term); - expect(query.body.doc).to.have.property('output'); - expect(query.body.doc.output).to.have.property('content_type', false); - expect(query.body.doc.output).to.have.property('content', payload); - }); - }); - - it('should update the job status and completed time', function () { - const startTime = moment().valueOf(); - const workerFn = function (job, jobPayload) { - expect(jobPayload).to.eql(payload); - return new Promise(function (resolve) { - setTimeout(() => resolve(payload), 10); - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(updateSpy); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('status', constants.JOB_STATUS_COMPLETED); - expect(doc).to.have.property('completed_at'); - const completedTimestamp = moment(doc.completed_at).valueOf(); - expect(completedTimestamp).to.be.greaterThan(startTime); - }); - }); - - it('handle warnings in the output by reflecting a warning status', () => { - const workerFn = () => { - return Promise.resolve({ - ...payload, - warnings: [`Don't run with scissors!`], - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - - return worker - ._performJob({ - test: true, - ...job, - }) - .then(() => { - sinon.assert.calledOnce(updateSpy); - const doc = updateSpy.firstCall.args[1].body.doc; - expect(doc).to.have.property('status', constants.JOB_STATUS_WARNINGS); - }); - }); - - it('should emit completion event', function (done) { - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - - worker.once(constants.EVENT_WORKER_COMPLETE, (workerJob) => { - try { - expect(workerJob).to.not.have.property('_source'); - - expect(workerJob).to.have.property('job'); - expect(workerJob.job).to.have.property('id'); - expect(workerJob.job).to.have.property('index'); - - expect(workerJob).to.have.property('output'); - expect(workerJob.output).to.have.property('content'); - expect(workerJob.output).to.have.property('content_type'); - - done(); - } catch (e) { - done(e); - } - }); - - worker._performJob(job); - }); - }); - - describe('worker failure', function () { - it('should append error output to job', function () { - const workerFn = function () { - throw new Error('test error'); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - const failStub = sinon.stub(worker, '_failJob'); - - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(failStub); - sinon.assert.calledWith(failStub, job, 'Error: test error'); - }); - }); - - it('should handle async errors', function () { - const workerFn = function () { - return new Promise((resolve, reject) => { - reject(new Error('test error')); - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - const failStub = sinon.stub(worker, '_failJob'); - - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(failStub); - sinon.assert.calledWith(failStub, job, 'Error: test error'); - }); - }); - - it('should handle rejecting with strings', function () { - const errorMessage = 'this is a string error'; - const workerFn = function () { - return new Promise((resolve, reject) => { - reject(errorMessage); - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - const failStub = sinon.stub(worker, '_failJob'); - - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(failStub); - sinon.assert.calledWith(failStub, job, errorMessage); - }); - }); - - it('should handle empty rejection', function (done) { - const workerFn = function () { - return new Promise((resolve, reject) => { - reject(); - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - - worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => { - try { - expect(err).to.have.property('error'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - expect(err.error).to.have.property('name', 'UnspecifiedWorkerError'); - done(); - } catch (e) { - done(e); - } - }); - - worker._performJob(job); - }); - }); - }); - - describe('job failures', function () { - function getFailStub(workerWithFailure) { - return sinon.stub(workerWithFailure, '_failJob').resolves(); - } - - describe('saving output failure', () => { - it('should mark the job as failed if saving to ES fails', async () => { - const job = { - _id: 'shouldSucced', - _source: { - timeout: 1000, - payload: 'test', - }, - }; - - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('update') - .rejects({ statusCode: 413 }); - - const workerFn = function (jobPayload) { - return new Promise(function (resolve) { - setTimeout(() => resolve(jobPayload), 10); - }); - }; - const worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - const failStub = getFailStub(worker); - - await worker._performJob(job); - worker.destroy(); - - sinon.assert.called(failStub); - }); - }); - - describe('search failure', function () { - it('causes _processPendingJobs to reject the Promise', function () { - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('search') - .rejects(new Error('test error')); - worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); - return worker._processPendingJobs().then( - () => { - expect().fail('expected rejected Promise'); - }, - (err) => { - expect(err).to.be.an(Error); - } - ); - }); - }); - - describe('timeout', function () { - let failStub; - let job; - let cancellationCallback; - - beforeEach(function () { - const timeout = 20; - cancellationCallback = function () {}; - - const workerFn = function (job, payload, cancellationToken) { - cancellationToken.on(cancellationCallback); - return new Promise(function (resolve) { - setTimeout(() => { - resolve(); - }, timeout * 2); - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - failStub = getFailStub(worker); - - job = { - _id: 'testTimeoutJob', - _source: { - timeout: timeout, - payload: 'test', - }, - }; - }); - - it('should not fail job', function () { - // fire of the job worker - return worker._performJob(job).then(() => { - sinon.assert.notCalled(failStub); - }); - }); - - it('should emit timeout if not completed in time', function (done) { - worker.once(constants.EVENT_WORKER_JOB_TIMEOUT, (err) => { - try { - expect(err).to.have.property('error'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - expect(err.error).to.have.property('name', 'WorkerTimeoutError'); - done(); - } catch (e) { - done(e); - } - }); - - // fire of the job worker - worker._performJob(job); - }); - - it('should call cancellation token callback if not completed in time', function (done) { - let called = false; - - cancellationCallback = () => { - called = true; - }; - - worker.once(constants.EVENT_WORKER_JOB_TIMEOUT, () => { - try { - expect(called).to.be(true); - done(); - } catch (err) { - done(err); - } - }); - - // fire of the job worker - worker._performJob(job); - }); - }); - - describe('worker failure', function () { - let failStub; - - const timeout = 20; - const job = { - _id: 'testTimeoutJob', - _source: { - timeout: timeout, - payload: 'test', - }, - }; - - beforeEach(function () { - sinon - .stub(mockQueue.client, 'callAsInternalUser') - .withArgs('search') - .callsFake(() => Promise.resolve({ hits: { hits: [] } })); - }); - - describe('workerFn rejects promise', function () { - beforeEach(function () { - const workerFn = function () { - return new Promise(function (resolve, reject) { - setTimeout(() => { - reject(); - }, timeout / 2); - }); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - failStub = getFailStub(worker); - }); - - it('should fail the job', function () { - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(failStub); - }); - }); - - it('should emit worker execution error', function (done) { - worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => { - try { - expect(err).to.have.property('error'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - done(); - } catch (e) { - done(e); - } - }); - - // fire of the job worker - worker._performJob(job); - }); - }); - - describe('workerFn throws error', function () { - beforeEach(function () { - const workerFn = function () { - throw new Error('test throw'); - }; - worker = new Worker(mockQueue, 'test', workerFn, defaultWorkerOptions); - - failStub = getFailStub(worker); - }); - - it('should fail the job', function () { - return worker._performJob(job).then(() => { - sinon.assert.calledOnce(failStub); - }); - }); - - it('should emit worker execution error', function (done) { - worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => { - try { - expect(err).to.have.property('error'); - expect(err).to.have.property('job'); - expect(err).to.have.property('worker'); - done(); - } catch (e) { - done(e); - } - }); - - // fire of the job worker - worker._performJob(job); - }); - }); - }); - }); -}); - -describe('Format Job Object', () => { - it('pulls index and ID', function () { - const jobMock = { - _index: 'foo', - _id: 'booId', - }; - expect(formatJobObject(jobMock)).eql({ - index: 'foo', - id: 'booId', - }); - }); -}); - -describe('Get Doc Path from ES Response', () => { - it('returns a formatted string after response of an update', function () { - const responseMock = { - _index: 'foo', - _id: 'booId', - }; - expect(getUpdatedDocPath(responseMock)).equal('/foo/booId'); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/constants/default_settings.js b/x-pack/plugins/reporting/server/lib/esqueue/constants/default_settings.js deleted file mode 100644 index 3e23f3611b870a2..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/constants/default_settings.js +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export const defaultSettings = { - DEFAULT_SETTING_TIMEOUT: 10000, - DEFAULT_SETTING_DATE_SEPARATOR: '-', - DEFAULT_SETTING_INTERVAL: 'week', - DEFAULT_SETTING_INDEX_SETTINGS: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - DEFAULT_WORKER_CHECK_SIZE: 1, -}; diff --git a/x-pack/plugins/reporting/server/lib/esqueue/constants/events.js b/x-pack/plugins/reporting/server/lib/esqueue/constants/events.js deleted file mode 100644 index d7ec62cc7c2877f..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/constants/events.js +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export const events = { - EVENT_QUEUE_ERROR: 'queue:error', - EVENT_JOB_ERROR: 'job:error', - EVENT_JOB_CREATED: 'job:created', - EVENT_JOB_CREATE_ERROR: 'job:creation error', - EVENT_WORKER_COMPLETE: 'worker:job complete', - EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error', - EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error', - EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error', - EVENT_WORKER_JOB_FAIL: 'worker:job failed', - EVENT_WORKER_JOB_FAIL_ERROR: 'worker:failed job update error', - EVENT_WORKER_JOB_EXECUTION_ERROR: 'worker:job execution error', - EVENT_WORKER_JOB_TIMEOUT: 'worker:job timeout', -}; diff --git a/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js b/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js deleted file mode 100644 index 5fcff3531851a61..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { events } from './events'; -import { statuses } from './statuses'; -import { defaultSettings } from './default_settings'; - -export const constants = { - ...events, - ...statuses, - ...defaultSettings, -}; diff --git a/x-pack/plugins/reporting/server/lib/esqueue/helpers/create_index.js b/x-pack/plugins/reporting/server/lib/esqueue/helpers/create_index.js deleted file mode 100644 index c0ce7548e2e1afc..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/helpers/create_index.js +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { constants } from '../constants'; - -const schema = { - meta: { - // We are indexing these properties with both text and keyword fields because that's what will be auto generated - // when an index already exists. This schema is only used when a reporting index doesn't exist. This way existing - // reporting indexes and new reporting indexes will look the same and the data can be queried in the same - // manner. - properties: { - /** - * Type of object that is triggering this report. Should be either search, visualization or dashboard. - * Used for job listing and telemetry stats only. - */ - objectType: { - type: 'text', - fields: { - keyword: { - type: 'keyword', - ignore_above: 256, - }, - }, - }, - /** - * Can be either preserve_layout, print or none (in the case of csv export). - * Used for phone home stats only. - */ - layout: { - type: 'text', - fields: { - keyword: { - type: 'keyword', - ignore_above: 256, - }, - }, - }, - }, - }, - browser_type: { type: 'keyword' }, - jobtype: { type: 'keyword' }, - payload: { type: 'object', enabled: false }, - priority: { type: 'byte' }, - timeout: { type: 'long' }, - process_expiration: { type: 'date' }, - created_by: { type: 'keyword' }, - created_at: { type: 'date' }, - started_at: { type: 'date' }, - completed_at: { type: 'date' }, - attempts: { type: 'short' }, - max_attempts: { type: 'short' }, - kibana_name: { type: 'keyword' }, - kibana_id: { type: 'keyword' }, - status: { type: 'keyword' }, - output: { - type: 'object', - properties: { - content_type: { type: 'keyword' }, - size: { type: 'long' }, - content: { type: 'object', enabled: false }, - }, - }, -}; - -export function createIndex(client, indexName, indexSettings = {}) { - const body = { - settings: { - ...constants.DEFAULT_SETTING_INDEX_SETTINGS, - ...indexSettings, - }, - mappings: { - properties: schema, - }, - }; - - return client - .callAsInternalUser('indices.exists', { - index: indexName, - }) - .then((exists) => { - if (!exists) { - return client - .callAsInternalUser('indices.create', { - index: indexName, - body: body, - }) - .then(() => true) - .catch((err) => { - /* FIXME creating the index will fail if there were multiple jobs staged in parallel. - * Each staged job checks `client.indices.exists` and could each get `false` as a response. - * Only the first job in line can successfully create it though. - * The problem might only happen in automated tests, where the indices are deleted after each test run. - * This catch block is in place to not fail a job if the job runner hits this race condition. - * Unfortunately we don't have a logger in scope to log a warning. - */ - const isIndexExistsError = - err && - err.body && - err.body.error && - err.body.error.type === 'resource_already_exists_exception'; - if (isIndexExistsError) { - return true; - } - - throw err; - }); - } - return exists; - }); -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/helpers/errors.js b/x-pack/plugins/reporting/server/lib/esqueue/helpers/errors.js deleted file mode 100644 index 6cb7aa529be593d..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/helpers/errors.js +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export function WorkerTimeoutError(message, props = {}) { - this.name = 'WorkerTimeoutError'; - this.message = message; - this.timeout = props.timeout; - this.jobId = props.jobId; - - if ('captureStackTrace' in Error) Error.captureStackTrace(this, WorkerTimeoutError); - else this.stack = new Error().stack; -} -WorkerTimeoutError.prototype = Object.create(Error.prototype); - -export function UnspecifiedWorkerError(message, props = {}) { - this.name = 'UnspecifiedWorkerError'; - this.message = message; - this.jobId = props.jobId; - - if ('captureStackTrace' in Error) Error.captureStackTrace(this, UnspecifiedWorkerError); - else this.stack = new Error().stack; -} -UnspecifiedWorkerError.prototype = Object.create(Error.prototype); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/index.js b/x-pack/plugins/reporting/server/lib/esqueue/index.js deleted file mode 100644 index 0fbcb54c673dd39..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/index.js +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { EventEmitter } from 'events'; -import { Worker } from './worker'; -import { constants } from './constants'; -import { omit } from 'lodash'; - -export { events } from './constants/events'; - -export class Esqueue extends EventEmitter { - constructor(store, options = {}) { - super(); - this.store = store; // for updating jobs in ES - this.index = this.store.indexPrefix; // for polling for pending jobs - this.settings = { - interval: constants.DEFAULT_SETTING_INTERVAL, - timeout: constants.DEFAULT_SETTING_TIMEOUT, - dateSeparator: constants.DEFAULT_SETTING_DATE_SEPARATOR, - ...omit(options, ['client']), - }; - this.client = options.client; - this._logger = options.logger || function () {}; - this._workers = []; - this._initTasks().catch((err) => this.emit(constants.EVENT_QUEUE_ERROR, err)); - } - - _initTasks() { - const initTasks = [this.client.callAsInternalUser('ping')]; - - return Promise.all(initTasks).catch((err) => { - this._logger(['initTasks', 'error'], err); - throw err; - }); - } - - registerWorker(type, workerFn, opts) { - const worker = new Worker(this, type, workerFn, { ...opts, logger: this._logger }); - this._workers.push(worker); - return worker; - } - - getWorkers() { - return this._workers.map((fn) => fn); - } - - destroy() { - const workers = this._workers.filter((worker) => worker.destroy()); - this._workers = workers; - } -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js deleted file mode 100644 index 469bafd6946122e..000000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; -import moment from 'moment'; -import Puid from 'puid'; -import { CancellationToken, Poller } from '../../../common'; -import { constants } from './constants'; -import { UnspecifiedWorkerError, WorkerTimeoutError } from './helpers/errors'; - -const puid = new Puid(); - -export function formatJobObject(job) { - return { - index: job._index, - id: job._id, - }; -} - -export function getUpdatedDocPath(response) { - const { _index: ind, _id: id } = response; - return `/${ind}/${id}`; -} - -const MAX_PARTIAL_ERROR_LENGTH = 1000; // 1000 of beginning, 1000 of end -const ERROR_PARTIAL_SEPARATOR = '...'; -const MAX_ERROR_LENGTH = MAX_PARTIAL_ERROR_LENGTH * 2 + ERROR_PARTIAL_SEPARATOR.length; - -function getLogger(opts, id, logLevel) { - return (msg, err) => { - /* - * This does not get the logger instance from queue.registerWorker in the createWorker function. - * The logger instance in the Equeue lib comes from createTaggedLogger, so logLevel tags are passed differently - */ - const logger = opts.logger || function () {}; - const message = `${id} - ${msg}`; - const tags = [logLevel]; - - if (err) { - // The error message string could be very long if it contains the request - // body of a request that was too large for Elasticsearch. - // This takes a partial version of the error message without scanning - // every character of the string, which would block Node. - const errString = `${message}: ${err.stack ? err.stack : err}`; - const errLength = errString.length; - const subStr = String.prototype.substring.bind(errString); - if (errLength > MAX_ERROR_LENGTH) { - const partialError = - subStr(0, MAX_PARTIAL_ERROR_LENGTH) + - ERROR_PARTIAL_SEPARATOR + - subStr(errLength - MAX_PARTIAL_ERROR_LENGTH); - - logger(partialError, tags); - logger( - `A partial version of the entire error message was logged. ` + - `The entire error message length is: ${errLength} characters.`, - tags - ); - } else { - logger(errString, tags); - } - return; - } - - logger(message, tags); - }; -} - -export class Worker extends events.EventEmitter { - constructor(queue, type, workerFn, opts) { - if (typeof type !== 'string') throw new Error('type must be a string'); - if (typeof workerFn !== 'function') throw new Error('workerFn must be a function'); - if (typeof opts !== 'object') throw new Error('opts must be an object'); - if (typeof opts.interval !== 'number') throw new Error('opts.interval must be a number'); - if (typeof opts.intervalErrorMultiplier !== 'number') - throw new Error('opts.intervalErrorMultiplier must be a number'); - - super(); - - this.id = puid.generate(); - this.kibanaId = opts.kibanaId; - this.kibanaName = opts.kibanaName; - this.queue = queue; - this._client = this.queue.client; - this.jobtype = type; - this.workerFn = workerFn; - - this.debug = getLogger(opts, this.id, 'debug'); - this.warn = getLogger(opts, this.id, 'warning'); - this.error = getLogger(opts, this.id, 'error'); - this.info = getLogger(opts, this.id, 'info'); - - this._running = true; - this.debug(`Created worker for ${this.jobtype} jobs`); - - this._poller = new Poller({ - functionToPoll: () => { - return this._processPendingJobs(); - }, - pollFrequencyInMillis: opts.interval, - trailing: true, - continuePollingOnError: true, - pollFrequencyErrorMultiplier: opts.intervalErrorMultiplier, - }); - this._startJobPolling(); - } - - destroy() { - this._running = false; - this._stopJobPolling(); - } - - toJSON() { - return { - id: this.id, - index: this.queue.index, - jobType: this.jobType, - }; - } - - emit(name, ...args) { - super.emit(name, ...args); - this.queue.emit(name, ...args); - } - - _formatErrorParams(err, job) { - const response = { - error: err, - worker: this.toJSON(), - }; - - if (job) response.job = formatJobObject(job); - return response; - } - - _claimJob(job) { - const m = moment(); - const startTime = m.toISOString(); - const expirationTime = m.add(job._source.timeout).toISOString(); - const attempts = job._source.attempts + 1; - - if (attempts > job._source.max_attempts) { - const msg = !job._source.output - ? `Max attempts reached (${job._source.max_attempts})` - : false; - return this._failJob(job, msg).then(() => false); - } - - const doc = { - attempts: attempts, - started_at: startTime, - process_expiration: expirationTime, - status: constants.JOB_STATUS_PROCESSING, - kibana_id: this.kibanaId, - kibana_name: this.kibanaName, - }; - - return this.queue.store - .updateReport({ - index: job._index, - id: job._id, - if_seq_no: job._seq_no, - if_primary_term: job._primary_term, - body: { doc }, - }) - .then((response) => { - this.info(`Job marked as claimed: ${getUpdatedDocPath(response)}`); - const updatedJob = { - ...job, - ...response, - }; - updatedJob._source = { - ...job._source, - ...doc, - }; - return updatedJob; - }); - } - - _failJob(job, output = false) { - this.warn(`Failing job ${job._id}`); - - const completedTime = moment().toISOString(); - const docOutput = this._formatOutput(output); - const doc = { - status: constants.JOB_STATUS_FAILED, - completed_at: completedTime, - output: docOutput, - }; - - this.emit(constants.EVENT_WORKER_JOB_FAIL, { - job: formatJobObject(job), - worker: this.toJSON(), - output: docOutput, - }); - - return this.queue.store - .updateReport({ - index: job._index, - id: job._id, - if_seq_no: job._seq_no, - if_primary_term: job._primary_term, - body: { doc }, - }) - .then((response) => { - this.info(`Job marked as failed: ${getUpdatedDocPath(response)}`); - }) - .catch((err) => { - if (err.statusCode === 409) return true; - this.error(`_failJob failed to update job ${job._id}`, err); - this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job)); - return false; - }); - } - - _formatOutput(output) { - const unknownMime = false; - const defaultOutput = null; - const docOutput = {}; - - if (typeof output === 'object' && output.content) { - docOutput.content = output.content; - docOutput.content_type = output.content_type || unknownMime; - docOutput.max_size_reached = output.max_size_reached; - docOutput.csv_contains_formulas = output.csv_contains_formulas; - docOutput.size = output.size; - docOutput.warnings = - output.warnings && output.warnings.length > 0 ? output.warnings : undefined; - } else { - docOutput.content = output || defaultOutput; - docOutput.content_type = unknownMime; - } - - return docOutput; - } - - _performJob(job) { - this.info(`Starting job`); - - const workerOutput = new Promise((resolve, reject) => { - // run the worker's workerFn - let isResolved = false; - const cancellationToken = new CancellationToken(); - const jobSource = job._source; - - Promise.resolve(this.workerFn.call(null, job, jobSource.payload, cancellationToken)) - .then((res) => { - // job execution was successful - if (res && res.warnings && res.warnings.length > 0) { - this.warn(`Job execution completed with warnings`); - } else { - this.info(`Job execution completed successfully`); - } - - isResolved = true; - resolve(res); - }) - .catch((err) => { - isResolved = true; - reject(err); - }); - - // fail if workerFn doesn't finish before timeout - const { timeout } = jobSource; - setTimeout(() => { - if (isResolved) return; - - cancellationToken.cancel(); - this.warn(`Timeout processing job ${job._id}`); - reject( - new WorkerTimeoutError(`Worker timed out, timeout = ${timeout}`, { - jobId: job._id, - timeout, - }) - ); - }, timeout); - }); - - return workerOutput.then( - (output) => { - const completedTime = moment().toISOString(); - const docOutput = this._formatOutput(output); - - const status = - output && output.warnings && output.warnings.length > 0 - ? constants.JOB_STATUS_WARNINGS - : constants.JOB_STATUS_COMPLETED; - const doc = { - status, - completed_at: completedTime, - output: docOutput, - }; - - return this.queue.store - .updateReport({ - index: job._index, - id: job._id, - if_seq_no: job._seq_no, - if_primary_term: job._primary_term, - body: { doc }, - }) - .then((response) => { - const eventOutput = { - job: formatJobObject(job), - output: docOutput, - }; - this.emit(constants.EVENT_WORKER_COMPLETE, eventOutput); - - this.info(`Job data saved successfully: ${getUpdatedDocPath(response)}`); - }) - .catch((err) => { - if (err.statusCode === 409) return false; - this.error(`Failure saving job output ${job._id}`, err); - this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job)); - return this._failJob(job, err.message ? err.message : false); - }); - }, - (jobErr) => { - if (!jobErr) { - jobErr = new UnspecifiedWorkerError('Unspecified worker error', { - jobId: job._id, - }); - } - - // job execution failed - if (jobErr.name === 'WorkerTimeoutError') { - this.warn(`Timeout on job ${job._id}`); - this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job)); - return; - - // append the jobId to the error - } else { - try { - Object.assign(jobErr, { jobId: job._id }); - } catch (e) { - // do nothing if jobId can not be appended - } - } - - this.error(`Failure occurred on job ${job._id}`, jobErr); - this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job)); - return this._failJob(job, jobErr.toString ? jobErr.toString() : false); - } - ); - } - - _startJobPolling() { - if (!this._running) { - return; - } - - this._poller.start(); - } - - _stopJobPolling() { - this._poller.stop(); - } - - _processPendingJobs() { - return this._getPendingJobs().then((jobs) => { - return this._claimPendingJobs(jobs); - }); - } - - _claimPendingJobs(jobs) { - if (!jobs || jobs.length === 0) return; - - let claimed = false; - - // claim a single job, stopping after first successful claim - return jobs - .reduce((chain, job) => { - return chain.then((claimedJob) => { - // short-circuit the promise chain if a job has been claimed - if (claimed) return claimedJob; - - return this._claimJob(job) - .then((claimResult) => { - claimed = true; - return claimResult; - }) - .catch((err) => { - if (err.statusCode === 409) { - this.warn( - `_claimPendingJobs encountered a version conflict on updating pending job ${job._id}`, - err - ); - return; // continue reducing and looking for a different job to claim - } - this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); - return Promise.reject(err); - }); - }); - }, Promise.resolve()) - .then((claimedJob) => { - if (!claimedJob) { - this.debug(`Found no claimable jobs out of ${jobs.length} total`); - return; - } - return this._performJob(claimedJob); - }) - .catch((err) => { - this.error('Error claiming jobs', err); - return Promise.reject(err); - }); - } - - _getPendingJobs() { - const nowTime = moment().toISOString(); - const query = { - seq_no_primary_term: true, - _source: { - excludes: ['output.content'], - }, - query: { - bool: { - filter: { - bool: { - minimum_should_match: 1, - should: [ - { term: { status: 'pending' } }, - { - bool: { - must: [ - { term: { status: 'processing' } }, - { range: { process_expiration: { lte: nowTime } } }, - ], - }, - }, - ], - }, - }, - }, - }, - sort: [{ priority: { order: 'asc' } }, { created_at: { order: 'asc' } }], - size: constants.DEFAULT_WORKER_CHECK_SIZE, - }; - - return this._client - .callAsInternalUser('search', { - index: `${this.queue.index}-*`, - body: query, - }) - .then((results) => { - const jobs = results.hits.hits; - if (jobs.length > 0) { - this.debug(`${jobs.length} outstanding jobs returned`); - } - return jobs; - }) - .catch((err) => { - // ignore missing indices errors - if (err && err.status === 404) return []; - - this.error('job querying failed', err); - this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err)); - throw err; - }); - } -} diff --git a/x-pack/plugins/reporting/server/lib/index.ts b/x-pack/plugins/reporting/server/lib/index.ts index e4adb1188e3fc4a..15e6770ec388352 100644 --- a/x-pack/plugins/reporting/server/lib/index.ts +++ b/x-pack/plugins/reporting/server/lib/index.ts @@ -5,11 +5,11 @@ */ export { checkLicense } from './check_license'; -export { createQueueFactory } from './create_queue'; export { cryptoFactory } from './crypto'; -export { enqueueJobFactory } from './enqueue_job'; -export { getExportTypesRegistry } from './export_types_registry'; +export { ExportTypesRegistry, getExportTypesRegistry } from './export_types_registry'; export { LevelLogger } from './level_logger'; +export { statuses } from './statuses'; export { ReportingStore } from './store'; +export { ReportingTask } from './task'; export { startTrace } from './trace'; export { runValidations } from './validate'; diff --git a/x-pack/plugins/reporting/server/lib/esqueue/constants/statuses.ts b/x-pack/plugins/reporting/server/lib/statuses.ts similarity index 100% rename from x-pack/plugins/reporting/server/lib/esqueue/constants/statuses.ts rename to x-pack/plugins/reporting/server/lib/statuses.ts diff --git a/x-pack/plugins/reporting/server/lib/store/index.ts b/x-pack/plugins/reporting/server/lib/store/index.ts index a88d36d3fdf9ae3..210870121df71bf 100644 --- a/x-pack/plugins/reporting/server/lib/store/index.ts +++ b/x-pack/plugins/reporting/server/lib/store/index.ts @@ -4,5 +4,5 @@ * you may not use this file except in compliance with the Elastic License. */ -export { Report } from './report'; +export { Report, ReportTaskJSON } from './report'; export { ReportingStore } from './store'; diff --git a/x-pack/plugins/reporting/server/lib/store/mapping.ts b/x-pack/plugins/reporting/server/lib/store/mapping.ts index a819923e2f1054c..d08b928cdca4bec 100644 --- a/x-pack/plugins/reporting/server/lib/store/mapping.ts +++ b/x-pack/plugins/reporting/server/lib/store/mapping.ts @@ -45,7 +45,7 @@ export const mapping = { priority: { type: 'byte' }, timeout: { type: 'long' }, process_expiration: { type: 'date' }, - created_by: { type: 'keyword' }, + created_by: { type: 'keyword' }, // `null` if security is disabled created_at: { type: 'date' }, started_at: { type: 'date' }, completed_at: { type: 'date' }, diff --git a/x-pack/plugins/reporting/server/lib/store/report.test.ts b/x-pack/plugins/reporting/server/lib/store/report.test.ts index 83444494e61d330..743ddd5ae5f0ccd 100644 --- a/x-pack/plugins/reporting/server/lib/store/report.test.ts +++ b/x-pack/plugins/reporting/server/lib/store/report.test.ts @@ -8,8 +8,8 @@ import { Report } from './report'; describe('Class Report', () => { it('constructs Report instance', () => { - const opts = { - index: '.reporting-test-index-12345', + const report = new Report({ + _index: '.reporting-test-index-12345', jobtype: 'test-report', created_by: 'created_by_test_string', browser_type: 'browser_type_test_string', @@ -17,11 +17,33 @@ describe('Class Report', () => { payload: { payload_test_field: 1 }, timeout: 30000, priority: 1, - }; - const report = new Report(opts); - expect(report.toJSON()).toMatchObject({ - _primary_term: undefined, - _seq_no: undefined, + }); + + expect(report.toEsDocsJSON()).toMatchObject({ + _index: '.reporting-test-index-12345', + _source: { + attempts: 0, + browser_type: 'browser_type_test_string', + completed_at: undefined, + created_at: undefined, + created_by: 'created_by_test_string', + jobtype: 'test-report', + max_attempts: 50, + meta: undefined, + payload: { + payload_test_field: 1, + }, + priority: 1, + started_at: undefined, + status: 'pending', + timeout: 30000, + }, + }); + expect(report.toReportTaskJSON()).toMatchObject({ + index: '.reporting-test-index-12345', + jobtype: 'test-report', + }); + expect(report.toApiJSON()).toMatchObject({ browser_type: 'browser_type_test_string', created_by: 'created_by_test_string', jobtype: 'test-report', @@ -33,12 +55,12 @@ describe('Class Report', () => { timeout: 30000, }); - expect(report.id).toBeDefined(); + expect(report._id).toBeDefined(); }); - it('updateWithDoc method syncs takes fields to sync ES metadata', () => { - const opts = { - index: '.reporting-test-index-12345', + it('updateWithEsDoc method syncs takes fields to sync ES metadata', () => { + const report = new Report({ + _index: '.reporting-test-index-12345', jobtype: 'test-report', created_by: 'created_by_test_string', browser_type: 'browser_type_test_string', @@ -46,8 +68,7 @@ describe('Class Report', () => { payload: { payload_test_field: 1 }, timeout: 30000, priority: 1, - }; - const report = new Report(opts); + }); const metadata = { _index: '.reporting-test-update', @@ -55,23 +76,65 @@ describe('Class Report', () => { _primary_term: 77, _seq_no: 99, }; - report.updateWithDoc(metadata); + report.updateWithEsDoc(metadata); - expect(report.toJSON()).toMatchObject({ - index: '.reporting-test-update', - _primary_term: 77, - _seq_no: 99, - browser_type: 'browser_type_test_string', - created_by: 'created_by_test_string', - jobtype: 'test-report', - max_attempts: 50, - payload: { - payload_test_field: 1, - }, - priority: 1, - timeout: 30000, - }); + expect(report.toEsDocsJSON()).toMatchInlineSnapshot(` + Object { + "_id": "12342p9o387549o2345", + "_index": ".reporting-test-update", + "_source": Object { + "attempts": 0, + "browser_type": "browser_type_test_string", + "completed_at": undefined, + "created_at": undefined, + "created_by": "created_by_test_string", + "jobtype": "test-report", + "max_attempts": 50, + "meta": undefined, + "payload": Object { + "payload_test_field": 1, + }, + "priority": 1, + "started_at": undefined, + "status": "pending", + "timeout": 30000, + }, + } + `); + expect(report.toReportTaskJSON()).toMatchInlineSnapshot(` + Object { + "id": "12342p9o387549o2345", + "index": ".reporting-test-update", + "jobtype": "test-report", + } + `); + expect(report.toApiJSON()).toMatchInlineSnapshot(` + Object { + "attempts": 0, + "browser_type": "browser_type_test_string", + "completed_at": undefined, + "created_at": undefined, + "created_by": "created_by_test_string", + "id": "12342p9o387549o2345", + "index": ".reporting-test-update", + "jobtype": "test-report", + "max_attempts": 50, + "meta": undefined, + "payload": Object { + "payload_test_field": 1, + }, + "priority": 1, + "started_at": undefined, + "status": "pending", + "timeout": 30000, + } + `); + }); - expect(report._id).toBe('12342p9o387549o2345'); + it('throws error if converted to task JSON before being synced with ES storage', () => { + const report = new Report({}); + expect(() => report.toReportTaskJSON()).toThrowErrorMatchingInlineSnapshot( + `"Report object is not synced with ES!"` + ); }); }); diff --git a/x-pack/plugins/reporting/server/lib/store/report.ts b/x-pack/plugins/reporting/server/lib/store/report.ts index cc9967e64b6ebcc..0a457da5d0c17b3 100644 --- a/x-pack/plugins/reporting/server/lib/store/report.ts +++ b/x-pack/plugins/reporting/server/lib/store/report.ts @@ -7,79 +7,167 @@ // @ts-ignore no module definition import Puid from 'puid'; -interface Payload { - id?: string; - index: string; +/* + * The document created by Reporting to store in the .reporting index + */ +interface ReportingDocument { + _id: string; + _index: string; + _seq_no: unknown; + _primary_term: unknown; jobtype: string; - created_by: string | boolean; + created_by: string | null; payload: unknown; + meta: unknown; browser_type: string; - priority: number; max_attempts: number; timeout: number; + + status: string; + attempts: number; + output?: unknown; + started_at?: string; + completed_at?: string; + created_at?: string; + priority?: number; + process_expiration?: string; +} + +/* + * The document created by Reporting to store as task parameters for Task + * Manager to reference the report in .reporting + */ +export interface ReportTaskJSON { + id: string; + index: string; + jobtype: string; } const puid = new Puid(); export class Report { + public _index?: string; + public _id: string; + public _primary_term?: unknown; // set by ES + public _seq_no: unknown; // set by ES + public readonly jobtype: string; - public readonly created_by: string | boolean; + public readonly created_at?: string; + public readonly created_by?: string | null; public readonly payload: unknown; - public readonly browser_type: string; - public readonly id: string; + public readonly meta: unknown; + public readonly browser_type?: string; - public readonly priority: number; - // queue stuff, to be removed with Task Manager integration - public readonly max_attempts: number; - public readonly timeout: number; - - public _index: string; - public _id?: string; // set by ES - public _primary_term?: unknown; // set by ES - public _seq_no: unknown; // set by ES + public readonly status: string; + public readonly attempts: number; + public readonly output?: unknown; + public readonly started_at?: string; + public readonly completed_at?: string; + public readonly process_expiration?: string; + public readonly priority?: number; + public readonly max_attempts?: number; + public readonly timeout?: number; /* - * Create an unsaved report + * Create an unsaved report, or an object to track a reporting document in ES */ - constructor(opts: Payload) { - this.jobtype = opts.jobtype; + constructor(opts: Partial) { + this._id = opts._id != null ? opts._id : puid.generate(); + this._index = opts._index; + this._primary_term = opts._primary_term; + this._seq_no = opts._seq_no; + + this.jobtype = opts.jobtype!; + this.created_at = opts.created_at; this.created_by = opts.created_by; + this.process_expiration = opts.process_expiration; this.payload = opts.payload; + this.meta = opts.meta; this.browser_type = opts.browser_type; this.priority = opts.priority; this.max_attempts = opts.max_attempts; this.timeout = opts.timeout; - this.id = puid.generate(); - this._index = opts.index; + this.status = opts.status || 'pending'; // FIXME constants + this.attempts = opts.attempts || 0; + this.output = opts.output || null; } /* * Update the report with "live" storage metadata */ - updateWithDoc(doc: Partial) { - if (doc._index) { - this._index = doc._index; // can not be undefined + updateWithEsDoc(doc: Partial) { + if (doc._index == null || doc._id == null) { + throw new Error(`Report object from ES has missing fields!`); } this._id = doc._id; + this._index = doc._index; this._primary_term = doc._primary_term; this._seq_no = doc._seq_no; } - toJSON() { + /* + * Data structure for writing to Elasticsearch index + */ + toEsDocsJSON() { + return { + _id: this._id, + _index: this._index, + _source: { + jobtype: this.jobtype, + created_at: this.created_at, + created_by: this.created_by, + payload: this.payload, + meta: this.meta, + timeout: this.timeout, + max_attempts: this.max_attempts, + priority: this.priority, + browser_type: this.browser_type, + status: this.status, + attempts: this.attempts, + started_at: this.started_at, + completed_at: this.completed_at, + }, + }; + } + + /* + * Task parameters for finding the report in the store, finding its export + * type and getting its run function later + */ + toReportTaskJSON(): ReportTaskJSON { + if (!this._id || !this._index) { + throw new Error(`Report object is not synced with ES!`); + } + + return { + id: this._id, + index: this._index, + jobtype: this.jobtype, + }; + } + + /* + * Data structure for API responses + */ + toApiJSON() { return { - id: this.id, + id: this._id, index: this._index, - _seq_no: this._seq_no, - _primary_term: this._primary_term, jobtype: this.jobtype, + created_at: this.created_at, created_by: this.created_by, payload: this.payload, + meta: this.meta, timeout: this.timeout, max_attempts: this.max_attempts, priority: this.priority, browser_type: this.browser_type, + status: this.status, + attempts: this.attempts, + started_at: this.started_at, + completed_at: this.completed_at, }; } } diff --git a/x-pack/plugins/reporting/server/lib/store/store.test.ts b/x-pack/plugins/reporting/server/lib/store/store.test.ts index 4868a1dfdd8f3ab..559f11a7a15623b 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.test.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.test.ts @@ -5,11 +5,12 @@ */ import sinon from 'sinon'; +import { ElasticsearchServiceSetup } from 'src/core/server'; import { ReportingConfig, ReportingCore } from '../..'; import { createMockReportingCore } from '../../test_helpers'; import { createMockLevelLogger } from '../../test_helpers/create_mock_levellogger'; -import { ReportingStore } from './store'; -import { ElasticsearchServiceSetup } from 'src/core/server'; +import { ReportingRecord, ReportingStore } from './store'; +import { Report } from './report'; const getMockConfig = (mockConfigGet: sinon.SinonStub) => ({ get: mockConfigGet, @@ -31,11 +32,13 @@ describe('ReportingStore', () => { mockConfig = getMockConfig(mockConfigGet); mockCore = await createMockReportingCore(mockConfig); + callClusterStub.reset(); callClusterStub.withArgs('indices.exists').resolves({}); callClusterStub.withArgs('indices.create').resolves({}); - callClusterStub.withArgs('index').resolves({}); + callClusterStub.withArgs('index').resolves({ _id: 'stub-id', _index: 'stub-index' }); callClusterStub.withArgs('indices.refresh').resolves({}); callClusterStub.withArgs('update').resolves({}); + callClusterStub.withArgs('get').resolves({}); mockCore.getElasticsearchService = () => (mockElasticsearch as unknown) as ElasticsearchServiceSetup; @@ -46,24 +49,20 @@ describe('ReportingStore', () => { const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, - }; - await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).resolves.toMatchObject({ + await expect(store.addReport(reportType, 'username1', reportPayload)).resolves.toMatchObject({ _primary_term: undefined, _seq_no: undefined, - browser_type: 'browser_type_string', - created_by: 'created_by_string', + attempts: 0, + browser_type: undefined, + completed_at: undefined, + created_by: 'username1', jobtype: 'unknowntype', - max_attempts: 1, + max_attempts: undefined, payload: {}, priority: 10, - timeout: 10000, + started_at: undefined, + status: 'pending', + timeout: undefined, }); }); @@ -77,34 +76,22 @@ describe('ReportingStore', () => { const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, - }; - expect( - store.addReport(reportType, reportPayload, reportOptions) - ).rejects.toMatchInlineSnapshot(`[Error: Invalid index interval: centurially]`); + expect(store.addReport(reportType, 'user1', reportPayload)).rejects.toMatchInlineSnapshot( + `[Error: Invalid index interval: centurially]` + ); }); it('handles error creating the index', async () => { // setup callClusterStub.withArgs('indices.exists').resolves(false); - callClusterStub.withArgs('indices.create').rejects(new Error('error')); + callClusterStub.withArgs('indices.create').rejects(new Error('horrible error')); const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, - }; await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).rejects.toMatchInlineSnapshot(`[Error: error]`); + store.addReport(reportType, 'user1', reportPayload) + ).rejects.toMatchInlineSnapshot(`[Error: horrible error]`); }); /* Creating the index will fail, if there were multiple jobs staged in @@ -116,20 +103,14 @@ describe('ReportingStore', () => { it('ignores index creation error if the index already exists and continues adding the report', async () => { // setup callClusterStub.withArgs('indices.exists').resolves(false); - callClusterStub.withArgs('indices.create').rejects(new Error('error')); + callClusterStub.withArgs('indices.create').rejects(new Error('devastating error')); const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, - }; await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).rejects.toMatchInlineSnapshot(`[Error: error]`); + store.addReport(reportType, 'user1', reportPayload) + ).rejects.toMatchInlineSnapshot(`[Error: devastating error]`); }); it('skips creating the index if already exists', async () => { @@ -142,25 +123,251 @@ describe('ReportingStore', () => { const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, - }; - await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).resolves.toMatchObject({ + await expect(store.addReport(reportType, 'user1', reportPayload)).resolves.toMatchObject({ _primary_term: undefined, _seq_no: undefined, - browser_type: 'browser_type_string', - created_by: 'created_by_string', + attempts: 0, + browser_type: undefined, + completed_at: undefined, + created_by: 'user1', jobtype: 'unknowntype', - max_attempts: 1, + max_attempts: undefined, + payload: {}, + priority: 10, + started_at: undefined, + status: 'pending', + timeout: undefined, + }); + }); + + it('allows username string to be `null`', async () => { + // setup + callClusterStub.withArgs('indices.exists').resolves(false); + callClusterStub + .withArgs('indices.create') + .rejects(new Error('resource_already_exists_exception')); // will be triggered but ignored + + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = {}; + await expect(store.addReport(reportType, null, reportPayload)).resolves.toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + attempts: 0, + browser_type: undefined, + completed_at: undefined, + created_by: null, + jobtype: 'unknowntype', + max_attempts: undefined, payload: {}, priority: 10, - timeout: 10000, + started_at: undefined, + status: 'pending', + timeout: undefined, }); }); }); + + it('findReport gets a report from ES and returns a Report object', async () => { + // setup + const mockReport: ReportingRecord = { + _id: '1234-foo-78', + _index: '.reporting-test-17409', + _primary_term: 'primary_term string', + _seq_no: 'seq_no string', + _source: { + created_at: 'some time', + created_by: 'some security person', + jobtype: 'csv', + status: 'pending', + meta: {}, + payload: {}, + browser_type: 'browser type string', + attempts: 0, + max_attempts: 1, + priority: 10, + timeout: 30000, + }, + }; + callClusterStub.withArgs('get').resolves(mockReport); + const store = new ReportingStore(mockCore, mockLogger); + + expect(await store.findReport({} as any)).toMatchInlineSnapshot(` + Report { + "_id": "1234-foo-78", + "_index": ".reporting-test-17409", + "_primary_term": "primary_term string", + "_seq_no": "seq_no string", + "attempts": 0, + "browser_type": "browser type string", + "completed_at": undefined, + "created_at": "some time", + "created_by": "some security person", + "jobtype": "csv", + "max_attempts": 1, + "meta": Object {}, + "output": null, + "payload": Object {}, + "priority": 10, + "process_expiration": undefined, + "started_at": undefined, + "status": "pending", + "timeout": 30000, + } + `); + }); + + it('setReportClaimed sets the status of a record to processing', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'id-of-processing', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { payload_test_field: 1 }, + timeout: 30000, + priority: 1, + }); + + await store.setReportClaimed(report, { testDoc: 'test' } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "status": "processing", + "testDoc": "test", + }, + }, + "id": "id-of-processing", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); + + it('setReportFailed sets the status of a record to failed', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'id-of-failure', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { payload_test_field: 1 }, + timeout: 30000, + priority: 1, + }); + + await store.setReportFailed(report, { errors: 'yes' } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "errors": "yes", + "status": "failed", + }, + }, + "id": "id-of-failure", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); + + it('setReportCompleted sets the status of a record to completed', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'vastly-great-report-id', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { payload_test_field: 1 }, + timeout: 30000, + priority: 1, + }); + + await store.setReportCompleted(report, { certainly_completed: 'yes' } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "certainly_completed": "yes", + "status": "completed", + }, + }, + "id": "vastly-great-report-id", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); + + it('setReportCompleted sets the status of a record to completed_with_warnings', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'vastly-great-report-id', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { payload_test_field: 1 }, + timeout: 30000, + priority: 1, + }); + + await store.setReportCompleted(report, { + certainly_completed: 'pretty_much', + output: { + warnings: [`those pants don't go with that shirt`], + }, + } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "certainly_completed": "pretty_much", + "output": Object { + "warnings": Array [ + "those pants don't go with that shirt", + ], + }, + "status": "completed_with_warnings", + }, + }, + "id": "vastly-great-report-id", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); }); diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index 0f1ed83b7176714..b90c170b7483000 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -5,36 +5,46 @@ */ import { ElasticsearchServiceSetup } from 'src/core/server'; -import { LevelLogger } from '../'; +import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; import { indexTimestamp } from './index_timestamp'; import { LayoutInstance } from '../layouts'; import { mapping } from './mapping'; -import { Report } from './report'; - -export const statuses = { - JOB_STATUS_PENDING: 'pending', - JOB_STATUS_PROCESSING: 'processing', - JOB_STATUS_COMPLETED: 'completed', - JOB_STATUS_WARNINGS: 'completed_with_warnings', - JOB_STATUS_FAILED: 'failed', - JOB_STATUS_CANCELLED: 'cancelled', -}; +import { Report, ReportTaskJSON } from './report'; + +export interface ReportingRecord { + _id: string; + _index: string; + _primary_term: unknown; // catch race conditions over read & update + _seq_no: unknown; // catch race conditions over read & update + _source: { + created_at: string; + created_by: string | null; + jobtype: string; + status: string; + meta: unknown; + payload: unknown; + browser_type: string; + priority?: number; // FIXME: unused + process_expiration?: string; // FIXME: unused + timeout: number; // syncs with scheduled task timeout + attempts: number; // syncs with scheduled task timeout + max_attempts: number; // syncs with scheduled task timeout + }; +} -interface AddReportOpts { +interface JobSettings { timeout: number; - created_by: string | boolean; browser_type: string; max_attempts: number; + priority: number; } -interface UpdateQuery { - index: string; - id: string; - if_seq_no: unknown; - if_primary_term: unknown; - body: { doc: Partial }; -} +const checkReportIsEditable = (report: Report) => { + if (!report._id || !report._index) { + throw new Error(`Report object is not synced with ES!`); + } +}; /* * A class to give an interface to historical reports in the reporting.index @@ -43,9 +53,9 @@ interface UpdateQuery { * - interface for downloading the report */ export class ReportingStore { - public readonly indexPrefix: string; - public readonly indexInterval: string; - + private readonly indexPrefix: string; + private readonly indexInterval: string; + private readonly jobSettings: JobSettings; private client: ElasticsearchServiceSetup['legacy']['client']; private logger: LevelLogger; @@ -56,12 +66,18 @@ export class ReportingStore { this.client = elasticsearch.legacy.client; this.indexPrefix = config.get('index'); this.indexInterval = config.get('queue', 'indexInterval'); + this.jobSettings = { + timeout: config.get('queue', 'timeout'), + browser_type: config.get('capture', 'browser', 'type'), + max_attempts: config.get('capture', 'maxAttempts'), + priority: 10, // unused + }; this.logger = logger; } private async createIndex(indexName: string) { - return this.client + return await this.client .callAsInternalUser('indices.exists', { index: indexName, }) @@ -95,24 +111,28 @@ export class ReportingStore { return; } + this.logger.error(err); throw err; }); }); } - private async saveReport(report: Report) { - const payload = report.payload as { objectType: string; layout: LayoutInstance }; + /* + * Called from addReport, which handles any errors + */ + private async indexReport(report: Report) { + const params = report.payload as { objectType: string; layout: LayoutInstance }; const indexParams = { index: report._index, - id: report.id, + id: report._id, body: { jobtype: report.jobtype, meta: { // We are copying these values out of payload because these fields are indexed and can be aggregated on // for tracking stats, while payload contents are not. - objectType: payload.objectType, - layout: payload.layout ? payload.layout.id : 'none', + objectType: params.objectType, + layout: params.layout ? params.layout.id : 'none', }, payload: report.payload, created_by: report.created_by, @@ -125,45 +145,152 @@ export class ReportingStore { browser_type: report.browser_type, }, }; - return this.client.callAsInternalUser('index', indexParams); + return await this.client.callAsInternalUser('index', indexParams); } + /* + * Called from addReport, which handles any errors + */ private async refreshIndex(index: string) { - return this.client.callAsInternalUser('indices.refresh', { index }); + return await this.client.callAsInternalUser('indices.refresh', { index }); } - public async addReport(type: string, payload: unknown, options: AddReportOpts): Promise { + public async addReport(type: string, username: string | null, payload: unknown): Promise { const timestamp = indexTimestamp(this.indexInterval); const index = `${this.indexPrefix}-${timestamp}`; await this.createIndex(index); const report = new Report({ - index, + _index: index, payload, jobtype: type, - created_by: options.created_by, - browser_type: options.browser_type, - max_attempts: options.max_attempts, - timeout: options.timeout, - priority: 10, // unused + created_by: username, + ...this.jobSettings, }); - const doc = await this.saveReport(report); - report.updateWithDoc(doc); + try { + const doc = await this.indexReport(report); + report.updateWithEsDoc(doc); - await this.refreshIndex(index); - this.logger.info(`Successfully queued pending job: ${report._index}/${report.id}`); + await this.refreshIndex(index); + this.logger.info(`Successfully stored pending job: ${report._index}/${report._id}`); - return report; + return report; + } catch (err) { + this.logger.error(`Error in addReport!`); + this.logger.error(err); + throw err; + } } - public async updateReport(query: UpdateQuery): Promise { - return this.client.callAsInternalUser('update', { - index: query.index, - id: query.id, - if_seq_no: query.if_seq_no, - if_primary_term: query.if_primary_term, - body: { doc: query.body.doc }, - }); + /* + * Search for a report from task data and return back the report + */ + public async findReport(taskJson: ReportTaskJSON): Promise { + try { + const document = await this.client.callAsInternalUser('get', { + index: taskJson.index, + id: taskJson.id, + }); + + return new Report({ + _id: document._id, + _index: document._index, + _seq_no: document._seq_no, + _primary_term: document._primary_term, + jobtype: document._source.jobtype, + attempts: document._source.attempts, + browser_type: document._source.browser_type, + created_at: document._source.created_at, + created_by: document._source.created_by, + max_attempts: document._source.max_attempts, + meta: document._source.meta, + payload: document._source.payload, + process_expiration: document._source.process_expiration, + status: document._source.status, + timeout: document._source.timeout, + priority: document._source.priority, + }); + } catch (err) { + this.logger.error('Error in findReport! ' + JSON.stringify({ report: taskJson })); + this.logger.error(err); + throw err; + } + } + + public async setReportClaimed(report: Report, stats: Partial): Promise { + const doc = { + ...stats, + status: statuses.JOB_STATUS_PROCESSING, + }; + + try { + checkReportIsEditable(report); + if (!report._id || !report._index) { + throw new Error(`Report object is not synced with ES!`); + } + + return await this.client.callAsInternalUser('update', { + id: report._id, + index: report._index, + if_seq_no: report._seq_no, + if_primary_term: report._primary_term, // TODO these fields are to handle race condition + body: { doc }, + }); + } catch (err) { + this.logger.error('Error in setting report processing status!'); + this.logger.error(err); + throw err; + } + } + + public async setReportFailed(report: Report, stats: Partial): Promise { + const doc = { + ...stats, + status: statuses.JOB_STATUS_FAILED, + }; + + try { + checkReportIsEditable(report); + + return await this.client.callAsInternalUser('update', { + id: report._id, + index: report._index, + if_seq_no: report._seq_no, + if_primary_term: report._primary_term, // TODO these fields are to handle race condition + body: { doc }, + }); + } catch (err) { + this.logger.error('Error in setting report failed status!'); + this.logger.error(err); + throw err; + } + } + + public async setReportCompleted(report: Report, stats: Partial): Promise { + const { output } = stats as { output: any }; + const status = + output && output.warnings && output.warnings.length > 0 + ? statuses.JOB_STATUS_WARNINGS + : statuses.JOB_STATUS_COMPLETED; + const doc = { + ...stats, + status, + }; + try { + checkReportIsEditable(report); + + return await this.client.callAsInternalUser('update', { + id: report._id, + index: report._index, + if_seq_no: report._seq_no, + if_primary_term: report._primary_term, // TODO these fields are to handle race condition + body: { doc }, + }); + } catch (err) { + this.logger.error('Error in setting report complete status!'); + this.logger.error(err); + throw err; + } } } diff --git a/x-pack/plugins/reporting/server/lib/task.ts b/x-pack/plugins/reporting/server/lib/task.ts new file mode 100644 index 000000000000000..8d2497a2fc0aa55 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/task.ts @@ -0,0 +1,289 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import moment from 'moment'; +import { ReportingCore } from '../'; +import { + ConcreteTaskInstance, + RunContext, + TaskManagerSetupContract, + TaskManagerStartContract, +} from '../../../task_manager/server'; +import { CancellationToken } from '../../common'; +import { ESQueueWorkerExecuteFn, TaskRunResult } from '../types'; +import { LevelLogger } from './'; +import { ReportingStore, ReportTaskJSON } from './store'; + +const REPORTING_QUEUE_JOB_TYPE = 'reporting-queue-job'; + +export interface TaskParams { + report: ReportTaskJSON; + payload: JobParamsType; + user: string | null; // user is optional as security may be disabled + headers: any; +} + +export type SchedulingFn = ( + params: TaskParams +) => Promise; + +interface ReportingTaskInstanceParams { + taskType: string; + state: object; + params: TaskParams; +} + +/* + * The error message string could be very long if it contains the request body + * of a request that was too large for Elasticsearch. This takes a partial + * version of the error message without scanning every character of the string, + * which would block Node. + */ +type LogLevel = 'info' | 'error'; +const MAX_PARTIAL_ERROR_LENGTH = 1000; // 1000 of beginning, 1000 of end +const ERROR_PARTIAL_SEPARATOR = '...'; +const MAX_ERROR_LENGTH = MAX_PARTIAL_ERROR_LENGTH * 2 + ERROR_PARTIAL_SEPARATOR.length; + +const logRunner = (logger: LevelLogger, logLevel: LogLevel, message: string, err?: Error) => { + if (err) { + const errString = `${message}: ${err.stack ? err.stack : err}`; + const errLength = errString.length; + const subStr = String.prototype.substring.bind(errString); + if (errLength > MAX_ERROR_LENGTH) { + const partialError = + subStr(0, MAX_PARTIAL_ERROR_LENGTH) + + ERROR_PARTIAL_SEPARATOR + + subStr(errLength - MAX_PARTIAL_ERROR_LENGTH); + + logger.error(partialError); + logger.error( + `A partial version of the entire error message was logged. ` + + `The entire error message length is: ${errLength} characters.` + ); + } else { + logger[logLevel](errString); + } + return; + } + + logger[logLevel](message); +}; + +function isOutput(output: TaskRunResult | Error): output is TaskRunResult { + return typeof output === 'object' && (output as TaskRunResult).content != null; +} + +export class ReportingTask { + private logger: LevelLogger; + private taskExecutors?: Map>; + private kibanaId?: string; + private kibanaName?: string; + store?: ReportingStore; + + constructor(private reporting: ReportingCore, parentLogger: LevelLogger) { + this.logger = parentLogger.clone(['taskRun']); + } + + /* + * Call each run task function factory of each export type, because + * everything is a function factory :) + * To be called from plugin start + * + * Asynchronously track the store + */ + public async init() { + const { reporting } = this; + + const exportTypesRegistry = reporting.getExportTypesRegistry(); + const executors = new Map>(); + for (const exportType of exportTypesRegistry.getAll()) { + const jobExecutor = exportType.runTaskFnFactory(reporting, this.logger); + // The task will run the function with the job type as a param. + // This allows us to retrieve the specific export type runFn when called to run an export + executors.set(exportType.jobType, jobExecutor); + } + + this.taskExecutors = executors; + const { store } = await this.reporting.getPluginStartDeps(); + this.store = store; + + const config = reporting.getConfig(); + this.kibanaId = config.kbnConfig.get('server', 'uuid'); + this.kibanaName = config.kbnConfig.get('server', 'name'); + } + + getStore() { + if (!this.store) { + throw new Error('Store is not yet initialized!'); + } + return this.store; + } + + private async _claimJob(job: TaskParams) { + const store = this.getStore(); + const report = await store.findReport(job.report); + const m = moment(); + const startTime = m.toISOString(); + const expirationTime = m.add(report.timeout).toISOString(); + const attempts = report.attempts + 1; + + const stats = { + attempts, + started_at: startTime, + process_expiration: expirationTime, + kibana_id: this.kibanaId, + kibana_name: this.kibanaName, + }; + + logRunner(this.logger, 'info', `Claiming ${report.jobtype} job ${report._id}`); + await store.setReportClaimed(report, stats); + } + + private async _failJob(job: TaskParams, error?: Error) { + const { report: taskJson } = job; + const message = `Failing ${taskJson.jobtype} job ${taskJson.id}`; + + // log the error + let docOutput; + if (error) { + logRunner(this.logger, 'error', message, error); + docOutput = this._formatOutput(error); + } else { + logRunner(this.logger, 'error', message); + } + + // update the report in the store + const store = this.getStore(); + const report = await store.findReport(taskJson); + const completedTime = moment().toISOString(); + const doc = { + completed_at: completedTime, + output: docOutput, + }; + await store.setReportFailed(report, doc); + } + + _formatOutput(output: TaskRunResult | Error) { + const docOutput = {} as TaskRunResult; + const unknownMime = null; + + if (isOutput(output)) { + docOutput.content = output.content; + docOutput.content_type = output.content_type || unknownMime; + docOutput.max_size_reached = output.max_size_reached; + docOutput.csv_contains_formulas = output.csv_contains_formulas; + docOutput.size = output.size; + docOutput.warnings = + output.warnings && output.warnings.length > 0 ? output.warnings : undefined; + } else { + const defaultOutput = null; + docOutput.content = output.toString() || defaultOutput; + docOutput.content_type = unknownMime; + } + + return docOutput; + } + + private async _performJob(job: TaskParams) { + if (!this.taskExecutors) { + throw new Error(`Task run function factories have not been called yet!`); + } + + const { report, payload } = job; + logRunner(this.logger, 'info', `Starting job ${report.jobtype} job ${report.id}`); + + // get the run_task function + const runner = this.taskExecutors.get(report.jobtype); + if (!runner) { + throw new Error(`No defined task runner function for ${report.jobtype}!`); + } + + // return the run output + const cancellationToken = new CancellationToken(); // FIXME: allow to be cancellable + return await runner(report.id, payload, cancellationToken); + } + + private async _completeJob(job: TaskParams, output: TaskRunResult) { + const { report: taskJson } = job; + logRunner(this.logger, 'info', `Saving ${taskJson.jobtype} job ${taskJson.id}`); + const completedTime = moment().toISOString(); + const docOutput = this._formatOutput(output); + + const store = this.getStore(); + const doc = { + completed_at: completedTime, + output: docOutput, + }; + const report = await store.findReport(taskJson); + await store.setReportCompleted(report, doc); + } + + public register(taskManager: TaskManagerSetupContract) { + taskManager.registerTaskDefinitions({ + [REPORTING_QUEUE_JOB_TYPE]: { + type: REPORTING_QUEUE_JOB_TYPE, + title: 'Reporting job queue', + createTaskRunner: this.createTaskRunner, + }, + }); + } + + /* + * Reporting job scheduler + */ + public scheduleTaskFactory(taskManager: TaskManagerStartContract): SchedulingFn { + const scheduleTask = async ({ + report, + payload, + user, + headers, + }: TaskParams) => { + const taskInstance: ReportingTaskInstanceParams = { + taskType: REPORTING_QUEUE_JOB_TYPE, + state: {}, + params: { report, payload, user, headers }, + }; + + return await taskManager.schedule(taskInstance); + }; + + return scheduleTask; + } + + /* + * Reporting job worker + */ + private createTaskRunner = (context: RunContext) => ({ + run: async () => { + // find the job in the store and set status to processing + const job = context.taskInstance.params as TaskParams; + + try { + await this._claimJob(job); + } catch (err) { + this.logger.error(`error in claimJob! ${job.report.id}`); + this.logger.error(err); + throw err; + } + + let output: TaskRunResult; + try { + // perform job + output = await this._performJob(job); + // update the report in the store + await this._completeJob(job, output); + } catch (error) { + await this._failJob(job, error); + } + + // return back to Task Manager + // TODO return errors + return { state: {} }; + }, + // TODO: use the cancellation tokens that are created for each job + cancel: () => Promise.reject(), + }); +} diff --git a/x-pack/plugins/reporting/server/plugin.test.ts b/x-pack/plugins/reporting/server/plugin.test.ts index 420fa8347cdebb9..191738bbc1727a1 100644 --- a/x-pack/plugins/reporting/server/plugin.test.ts +++ b/x-pack/plugins/reporting/server/plugin.test.ts @@ -15,8 +15,13 @@ jest.mock('./browsers/install', () => ({ })); import { coreMock } from 'src/core/server/mocks'; +import { UsageCollectionSetup } from 'src/plugins/usage_collection/server'; +import { LicensingPluginSetup } from '../../licensing/server'; +import { SecurityPluginSetup } from '../../security/server'; +import { TaskManagerSetupContract } from '../../task_manager/server'; import { ReportingPlugin } from './plugin'; import { createMockConfigSchema } from './test_helpers'; +import { ReportingSetupDeps, ReportingStartDeps } from './types'; const sleep = (time: number) => new Promise((r) => setTimeout(r, time)); @@ -25,21 +30,25 @@ describe('Reporting Plugin', () => { let initContext: any; let coreSetup: any; let coreStart: any; - let pluginSetup: any; - let pluginStart: any; + let pluginSetup: ReportingSetupDeps; + let pluginStart: ReportingStartDeps; beforeEach(async () => { configSchema = createMockConfigSchema(); initContext = coreMock.createPluginInitializerContext(configSchema); - coreSetup = await coreMock.createSetup(configSchema); - coreStart = await coreMock.createStart(); - pluginSetup = ({ - licensing: {}, - usageCollection: { + coreSetup = coreMock.createSetup(configSchema); + coreStart = coreMock.createStart(); + + pluginSetup = { + licensing: {} as LicensingPluginSetup, + usageCollection: ({ makeUsageCollector: jest.fn(), registerCollector: jest.fn(), - }, - security: { + } as unknown) as UsageCollectionSetup, + taskManager: ({ + registerTaskDefinitions: jest.fn(), + } as unknown) as TaskManagerSetupContract, + security: ({ authc: { getCurrentUser: () => ({ id: '123', @@ -47,13 +56,10 @@ describe('Reporting Plugin', () => { username: 'Tom Riddle', }), }, - }, - } as unknown) as any; - pluginStart = ({ - data: { - fieldFormats: {}, - }, - } as unknown) as any; + } as unknown) as SecurityPluginSetup, + }; + + pluginStart = { data: { fieldFormats: {} } } as ReportingStartDeps; }); it('has a sync setup process', () => { diff --git a/x-pack/plugins/reporting/server/plugin.ts b/x-pack/plugins/reporting/server/plugin.ts index cedc9dc14a2376b..9cfdc389428fb2b 100644 --- a/x-pack/plugins/reporting/server/plugin.ts +++ b/x-pack/plugins/reporting/server/plugin.ts @@ -8,13 +8,7 @@ import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from 'src/core import { ReportingCore } from './'; import { initializeBrowserDriverFactory } from './browsers'; import { buildConfig, ReportingConfigType } from './config'; -import { - createQueueFactory, - enqueueJobFactory, - LevelLogger, - runValidations, - ReportingStore, -} from './lib'; +import { LevelLogger, ReportingStore, ReportingTask, runValidations } from './lib'; import { registerRoutes } from './routes'; import { setFieldFormats } from './services'; import { ReportingSetup, ReportingSetupDeps, ReportingStart, ReportingStartDeps } from './types'; @@ -49,11 +43,12 @@ export class ReportingPlugin }); const { elasticsearch, http } = core; - const { licensing, security } = plugins; + const { licensing, security, taskManager } = plugins; const { initializerContext: initContext, reportingCore } = this; const router = http.createRouter(); const basePath = http.basePath.get; + const task = new ReportingTask(this.reportingCore, this.logger); reportingCore.pluginSetup({ elasticsearch, @@ -61,10 +56,12 @@ export class ReportingPlugin basePath, router, security, + task, }); registerReportingUsageCollector(reportingCore, plugins); registerRoutes(reportingCore, this.logger); + task.register(taskManager); // async background setup (async () => { @@ -84,7 +81,9 @@ export class ReportingPlugin setFieldFormats(plugins.data.fieldFormats); const { logger, reportingCore } = this; - const { elasticsearch } = reportingCore.getPluginSetupDeps(); + const { elasticsearch, task } = reportingCore.getPluginSetupDeps(); + + task.init(); // async background start (async () => { @@ -93,15 +92,12 @@ export class ReportingPlugin const browserDriverFactory = await initializeBrowserDriverFactory(config, logger); const store = new ReportingStore(reportingCore, logger); - const esqueue = await createQueueFactory(reportingCore, store, logger); // starts polling for pending jobs - const enqueueJob = enqueueJobFactory(reportingCore, store, logger); // called from generation routes reportingCore.pluginStart({ browserDriverFactory, savedObjects: core.savedObjects, uiSettings: core.uiSettings, - esqueue, - enqueueJob, + scheduleTask: task.scheduleTaskFactory(plugins.taskManager), store, }); diff --git a/x-pack/plugins/reporting/server/routes/generation.test.ts b/x-pack/plugins/reporting/server/routes/generation.test.ts index c73c443d2390bb5..2cea5f05aaba6e0 100644 --- a/x-pack/plugins/reporting/server/routes/generation.test.ts +++ b/x-pack/plugins/reporting/server/routes/generation.test.ts @@ -138,8 +138,7 @@ describe('POST /api/reporting/generate', () => { }); it('returns 500 if job handler throws an error', async () => { - // throw an error from enqueueJob - core.getEnqueueJob = jest.fn().mockRejectedValue('Sorry, this tests says no'); + callClusterStub.withArgs('index').rejects('silly'); registerJobGenerationRoutes(core, mockLogger); @@ -163,12 +162,26 @@ describe('POST /api/reporting/generate', () => { .send({ jobParams: `abc` }) .expect(200) .then(({ body }) => { - expect(body).toMatchObject({ - job: { - id: expect.any(String), - }, - path: expect.any(String), - }); + expect(body).toMatchInlineSnapshot(` + Object { + "job": Object { + "attempts": 0, + "created_by": "Tom Riddle", + "id": "foo", + "index": "foo-index", + "jobtype": "printable_pdf", + "payload": Object { + "scheduleParamsTest": Object { + "test1": "yes", + }, + }, + "priority": 10, + "status": "pending", + "timeout": 10000, + }, + "path": "undefined/api/reporting/jobs/download/foo", + } + `); }); }); }); diff --git a/x-pack/plugins/reporting/server/routes/generation.ts b/x-pack/plugins/reporting/server/routes/generation.ts index 017e875931ae2c7..d990d33f1e1a1c2 100644 --- a/x-pack/plugins/reporting/server/routes/generation.ts +++ b/x-pack/plugins/reporting/server/routes/generation.ts @@ -10,6 +10,7 @@ import { kibanaResponseFactory } from 'src/core/server'; import { ReportingCore } from '../'; import { API_BASE_URL } from '../../common/constants'; import { LevelLogger as Logger } from '../lib'; +import { enqueueJobFactory } from '../lib/enqueue_job'; import { registerGenerateFromJobParams } from './generate_from_jobparams'; import { registerGenerateCsvFromSavedObjectImmediate } from './generate_from_savedobject_immediate'; import { HandlerFunction } from './types'; @@ -43,11 +44,10 @@ export function registerJobGenerationRoutes(reporting: ReportingCore, logger: Lo } try { - const enqueueJob = await reporting.getEnqueueJob(); - const job = await enqueueJob(exportTypeId, jobParams, user, context, req); + const enqueueJob = enqueueJobFactory(reporting, logger); + const report = await enqueueJob(exportTypeId, jobParams, user, context, req); - // return the queue's job information - const jobJson = job.toJSON(); + // return task manager's task information and the download URL const downloadBaseUrl = getDownloadBaseUrl(reporting); return res.ok({ @@ -55,8 +55,8 @@ export function registerJobGenerationRoutes(reporting: ReportingCore, logger: Lo 'content-type': 'application/json', }, body: { - path: `${downloadBaseUrl}/${jobJson.id}`, - job: jobJson, + path: `${downloadBaseUrl}/${report._id}`, + job: report.toApiJSON(), }, }); } catch (err) { diff --git a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts index 93f79bfd892b995..2eaf073cf4f435d 100644 --- a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts +++ b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts @@ -8,8 +8,7 @@ import contentDisposition from 'content-disposition'; import * as _ from 'lodash'; import { CSV_JOB_TYPE } from '../../../common/constants'; -import { statuses } from '../../lib/esqueue/constants/statuses'; -import { ExportTypesRegistry } from '../../lib/export_types_registry'; +import { statuses, ExportTypesRegistry } from '../../lib'; import { ExportTypeDefinition, JobSource, TaskRunResult } from '../../types'; type ExportTypeType = ExportTypeDefinition; @@ -18,11 +17,11 @@ interface ErrorFromPayload { message: string; } -// A camelCase version of TaskRunResult +// interface of the API result interface Payload { statusCode: number; content: string | Buffer | ErrorFromPayload; - contentType: string; + contentType: string | null; headers: Record; } diff --git a/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts b/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts index 95b06aa39f07e4e..b6b842dc0a09030 100644 --- a/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts +++ b/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts @@ -7,11 +7,10 @@ jest.mock('../routes'); jest.mock('../usage'); jest.mock('../browsers'); -jest.mock('../lib/create_queue'); -jest.mock('../lib/enqueue_job'); jest.mock('../lib/validate'); import * as Rx from 'rxjs'; +import { ConcreteTaskInstance, TaskManagerStartContract } from '../../../task_manager/server'; import { ReportingConfig, ReportingCore } from '../'; import { chromium, @@ -19,10 +18,9 @@ import { initializeBrowserDriverFactory, } from '../browsers'; import { ReportingInternalSetup, ReportingInternalStart } from '../core'; +import { ReportingStore, ReportingTask } from '../lib'; import { ReportingStartDeps } from '../types'; -import { ReportingStore } from '../lib'; import { createMockLevelLogger } from './create_mock_levellogger'; -import { Report } from '../lib/store'; (initializeBrowserDriverFactory as jest.Mock< Promise @@ -30,13 +28,18 @@ import { Report } from '../lib/store'; (chromium as any).createDriverFactory.mockImplementation(() => ({})); -const createMockPluginSetup = (setupMock?: any): ReportingInternalSetup => { +const createMockPluginSetup = ( + mockReportingCore: ReportingCore, + setupMock?: any +): ReportingInternalSetup => { + const logger = createMockLevelLogger(); return { elasticsearch: setupMock.elasticsearch || { legacy: { client: {} } }, - basePath: setupMock.basePath, + basePath: setupMock.basePath || '/all-about-that-basepath', router: setupMock.router, security: setupMock.security, licensing: { license$: Rx.of({ isAvailable: true, isActive: true, type: 'basic' }) } as any, + task: new ReportingTask(mockReportingCore, logger), }; }; @@ -48,11 +51,10 @@ const createMockPluginStart = ( const store = new ReportingStore(mockReportingCore, logger); return { browserDriverFactory: startMock.browserDriverFactory, - enqueueJob: startMock.enqueueJob || jest.fn().mockResolvedValue(new Report({} as any)), - esqueue: startMock.esqueue, savedObjects: startMock.savedObjects || { getScopedClient: jest.fn() }, uiSettings: startMock.uiSettings || { asScopedToClient: () => ({ get: jest.fn() }) }, store, + scheduleTask: async () => ({} as ConcreteTaskInstance), }; }; @@ -65,6 +67,7 @@ export const createMockConfigSchema = (overrides?: any) => ({ export const createMockStartDeps = (startMock?: any): ReportingStartDeps => ({ data: startMock.data, + taskManager: {} as TaskManagerStartContract, }); export const createMockReportingCore = async ( @@ -72,15 +75,14 @@ export const createMockReportingCore = async ( setupDepsMock: ReportingInternalSetup | undefined = undefined, startDepsMock: ReportingInternalStart | undefined = undefined ) => { - if (!setupDepsMock) { - setupDepsMock = createMockPluginSetup({}); - } - const mockReportingCore = { getConfig: () => config, getElasticsearchService: () => setupDepsMock?.elasticsearch, } as ReportingCore; + if (!setupDepsMock) { + setupDepsMock = createMockPluginSetup(mockReportingCore, {}); + } if (!startDepsMock) { startDepsMock = createMockPluginStart(mockReportingCore, {}); } diff --git a/x-pack/plugins/reporting/server/types.ts b/x-pack/plugins/reporting/server/types.ts index ff597b53ea0b0b9..dc6061ec33fe0c0 100644 --- a/x-pack/plugins/reporting/server/types.ts +++ b/x-pack/plugins/reporting/server/types.ts @@ -12,6 +12,7 @@ import { UsageCollectionSetup } from 'src/plugins/usage_collection/server'; import { CancellationToken } from '../../../plugins/reporting/common'; import { LicensingPluginSetup } from '../../licensing/server'; import { SecurityPluginSetup } from '../../security/server'; +import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server'; import { JobStatus } from '../common/types'; import { ReportingConfigType } from './config'; import { ReportingCore } from './core'; @@ -54,12 +55,14 @@ export interface TimeRangeParams { max?: Date | string | number | null; } +// the "raw" data coming from the client, unencrypted export interface JobParamPostPayload { timerange?: TimeRangeParams; } +// the pre-processed, encrypted data ready for storage export interface ScheduledTaskParams { - headers?: string; // serialized encrypted headers + headers: string; // serialized encrypted headers jobParams: JobParamsType; title: string; type: string; @@ -77,10 +80,10 @@ export interface JobSource { } export interface TaskRunResult { - content_type: string; + content_type: string | null; content: string | null; - size: number; csv_contains_formulas?: boolean; + size: number; max_size_reached?: boolean; warnings?: string[]; } @@ -160,11 +163,13 @@ export type ScreenshotsObservableFn = ({ export interface ReportingSetupDeps { licensing: LicensingPluginSetup; security?: SecurityPluginSetup; + taskManager: TaskManagerSetupContract; usageCollection?: UsageCollectionSetup; } export interface ReportingStartDeps { data: DataPluginStart; + taskManager: TaskManagerStartContract; } export type ReportingStart = object; @@ -177,12 +182,14 @@ export type ReportingSetup = object; export type CaptureConfig = ReportingConfigType['capture']; export type ScrollConfig = ReportingConfigType['csv']['scroll']; +// rename me export type ESQueueCreateJobFn = ( jobParams: JobParamsType, context: RequestHandlerContext, request: KibanaRequest ) => Promise; +// rename me export type ESQueueWorkerExecuteFn = ( jobId: string, job: ScheduledTaskParamsType,