From dc813c351fe111c895e85a188372ad31625d8c8c Mon Sep 17 00:00:00 2001 From: Tim Sullivan Date: Tue, 2 Jan 2024 16:00:53 -0700 Subject: [PATCH] Handle content stream errors in report pre-deletion (#173792) Re-addresses https://github.com/elastic/kibana/issues/171363 The bug was still evident, especially when using network throttling to add slight lag to the request turnaround times. This PR adds more handling of errors that could be thrown slightly prior to deleting the report document, when we try to clear all chunks of the report using the content stream.
Before https://github.com/elastic/kibana/assets/908371/c27fe314-0f93-42b4-8076-99a1e30b8d2f
After https://github.com/elastic/kibana/assets/908371/4c1f5edd-73f1-4ca4-a40a-f900ca5f9c78
### Checklist - [x] Unit tests --- .../routes/common/jobs/get_job_routes.ts | 44 ++- .../management/integration_tests/jobs.test.ts | 262 ++++++++++-------- .../public/integration_tests/jobs.test.ts | 210 +++++++------- 3 files changed, 295 insertions(+), 221 deletions(-) diff --git a/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts b/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts index a88abae999be05..ca25b13990dc81 100644 --- a/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts +++ b/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts @@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => { return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => { const docIndex = doc.index; const stream = await getContentStream(reporting, { id: docId, index: docIndex }); - /** @note Overwriting existing content with an empty buffer to remove all the chunks. */ - await new Promise((resolve) => { - stream.end('', 'utf8', () => { - resolve(); - }); + const reportingSetup = reporting.getPluginSetupDeps(); + const logger = reportingSetup.logger.get('delete-report'); + + // An "error" event is emitted if an error is + // passed to the `stream.end` callback from + // the _final method of the ContentStream. + // This event must be handled. + stream.on('error', (err) => { + logger.error(err); }); - await jobsQuery.delete(docIndex, docId); - return res.ok({ - body: { deleted: true }, - }); + try { + // Overwriting existing content with an + // empty buffer to remove all the chunks. + await new Promise((resolve, reject) => { + stream.end('', 'utf8', (error?: Error) => { + if (error) { + // handle error that could be thrown + // from the _write method of the ContentStream + reject(error); + } else { + resolve(); + } + }); + }); + + await jobsQuery.delete(docIndex, docId); + + return res.ok({ + body: { deleted: true }, + }); + } catch (error) { + logger.error(error); + return res.customError({ + statusCode: 500, + }); + } }); }; diff --git a/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts b/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts index decc2300026e35..6fe086f33faa5f 100644 --- a/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts +++ b/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts @@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs' type SetupServerReturn = Awaited>; -describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { +describe(`Reporting Job Management Routes: Internal`, () => { const reportingSymbol = Symbol('reporting'); let server: SetupServerReturn['server']; let usageCounter: IUsageCounter; @@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { await server.stop(); }); - it('fails on malformed download IDs', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutes(core); + describe('download report', () => { + it('fails on malformed download IDs', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) - .expect(400) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot( - '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' - ) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) + .expect(400) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot( + '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' + ) + ); + }); - it('fails on unauthenticated users', async () => { - mockStartDeps = await createMockPluginStart( - { - licensing: { - ...licensingMock.createStart(), - license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + it('fails on unauthenticated users', async () => { + mockStartDeps = await createMockPluginStart( + { + licensing: { + ...licensingMock.createStart(), + license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + }, + security: { authc: { getCurrentUser: () => undefined } }, }, - security: { authc: { getCurrentUser: () => undefined } }, - }, - mockConfigSchema - ); - core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); - registerJobInfoRoutes(core); + mockConfigSchema + ); + core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) - .expect(401) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) + .expect(401) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) + ); + }); - it('returns 404 if job not found', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutes(core); + it('returns 404 if job not found', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(404); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(404); + }); - it('returns a 403 if not a valid job type', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'invalidJobType', - payload: { title: 'invalid!' }, - }) - ); - registerJobInfoRoutes(core); + it('returns a 403 if not a valid job type', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'invalidJobType', + payload: { title: 'invalid!' }, + }) + ); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(403); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(403); + }); - it(`returns job's info`, async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: mockJobTypeBase64Encoded, - payload: {}, // payload is irrelevant - }) - ); + it(`returns job's info`, async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: mockJobTypeBase64Encoded, + payload: {}, // payload is irrelevant + }) + ); - registerJobInfoRoutes(core); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) - .expect(200); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) + .expect(200); + }); - it(`returns 403 if a user cannot view a job's info`, async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'customForbiddenJobType', - payload: {}, // payload is irrelevant - }) - ); + it(`returns 403 if a user cannot view a job's info`, async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'customForbiddenJobType', + payload: {}, // payload is irrelevant + }) + ); - registerJobInfoRoutes(core); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) - .expect(403); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) + .expect(403); + }); - it('when a job is incomplete', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: mockJobTypeUnencoded, - status: 'pending', - payload: { title: 'incomplete!' }, - }) - ); - registerJobInfoRoutes(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(503) - .expect('Content-Type', 'text/plain; charset=utf-8') - .expect('Retry-After', '30') - .then(({ text }) => expect(text).toEqual('pending')); - }); + it('when a job is incomplete', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: mockJobTypeUnencoded, + status: 'pending', + payload: { title: 'incomplete!' }, + }) + ); + registerJobInfoRoutes(core); - it('when a job fails', async () => { - mockEsClient.search.mockResponse( - getHits({ - jobtype: mockJobTypeUnencoded, - status: 'failed', - output: { content: 'job failure message' }, - payload: { title: 'failing job!' }, - }) - ); - registerJobInfoRoutes(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(500) - .expect('Content-Type', 'application/json; charset=utf-8') - .then(({ body }) => - expect(body.message).toEqual('Reporting generation failed: job failure message') + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(503) + .expect('Content-Type', 'text/plain; charset=utf-8') + .expect('Retry-After', '30') + .then(({ text }) => expect(text).toEqual('pending')); + }); + + it('when a job fails', async () => { + mockEsClient.search.mockResponse( + getHits({ + jobtype: mockJobTypeUnencoded, + status: 'failed', + output: { content: 'job failure message' }, + payload: { title: 'failing job!' }, + }) ); - }); + registerJobInfoRoutes(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8') + .then(({ body }) => + expect(body.message).toEqual('Reporting generation failed: job failure message') + ); + }); - describe('successful downloads', () => { it('when a known job-type is complete', async () => { mockEsClient.search.mockResponseOnce(getCompleteHits()); registerJobInfoRoutes(core); @@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { }); }); }); + + describe('delete report', () => { + it('handles content stream errors', async () => { + stream = new Readable({ + read() { + this.push('test'); + this.push(null); + }, + }) as typeof stream; + stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => { + callback(new Error('An error occurred in ending the content stream')); + }); + + (getContentStream as jest.MockedFunction).mockResolvedValue(stream); + mockEsClient.search.mockResponseOnce(getCompleteHits()); + registerJobInfoRoutes(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`) + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8'); + }); + }); }); diff --git a/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts b/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts index b301b1546dabfd..1d2ef78cf60e3d 100644 --- a/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts +++ b/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts @@ -36,7 +36,7 @@ import { registerJobInfoRoutesPublic } from '../jobs'; type SetupServerReturn = Awaited>; -describe(`GET ${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { +describe(`Reporting Job Management Routes: Public`, () => { const reportingSymbol = Symbol('reporting'); let server: SetupServerReturn['server']; let usageCounter: IUsageCounter; @@ -135,114 +135,114 @@ describe(`GET ${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { await server.stop(); }); - it('fails on malformed download IDs', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutesPublic(core); + describe('download report', () => { + it('fails on malformed download IDs', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutesPublic(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) - .expect(400) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot( - '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' - ) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) + .expect(400) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot( + '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' + ) + ); + }); - it('fails on unauthenticated users', async () => { - mockStartDeps = await createMockPluginStart( - { - licensing: { - ...licensingMock.createStart(), - license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + it('fails on unauthenticated users', async () => { + mockStartDeps = await createMockPluginStart( + { + licensing: { + ...licensingMock.createStart(), + license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + }, + security: { authc: { getCurrentUser: () => undefined } }, }, - security: { authc: { getCurrentUser: () => undefined } }, - }, - mockConfigSchema - ); - core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); - registerJobInfoRoutesPublic(core); + mockConfigSchema + ); + core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); + registerJobInfoRoutesPublic(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) - .expect(401) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) + .expect(401) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) + ); + }); + + it('returns 404 if job not found', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutesPublic(core); - it('returns 404 if job not found', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutesPublic(core); + await server.start(); - await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(404); + }); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(404); - }); + it('returns a 403 if not a valid job type', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'invalidJobType', + payload: { title: 'invalid!' }, + }) + ); + registerJobInfoRoutesPublic(core); - it('returns a 403 if not a valid job type', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'invalidJobType', - payload: { title: 'invalid!' }, - }) - ); - registerJobInfoRoutesPublic(core); + await server.start(); - await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(403); + }); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(403); - }); + it('when a job is incomplete', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'unencodedJobType', + status: 'pending', + payload: { title: 'incomplete!' }, + }) + ); + registerJobInfoRoutesPublic(core); - it('when a job is incomplete', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'unencodedJobType', - status: 'pending', - payload: { title: 'incomplete!' }, - }) - ); - registerJobInfoRoutesPublic(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(503) - .expect('Content-Type', 'text/plain; charset=utf-8') - .expect('Retry-After', '30') - .then(({ text }) => expect(text).toEqual('pending')); - }); + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(503) + .expect('Content-Type', 'text/plain; charset=utf-8') + .expect('Retry-After', '30') + .then(({ text }) => expect(text).toEqual('pending')); + }); - it('when a job fails', async () => { - mockEsClient.search.mockResponse( - getHits({ - jobtype: 'unencodedJobType', - status: 'failed', - output: { content: 'job failure message' }, - payload: { title: 'failing job!' }, - }) - ); - registerJobInfoRoutesPublic(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(500) - .expect('Content-Type', 'application/json; charset=utf-8') - .then(({ body }) => - expect(body.message).toEqual('Reporting generation failed: job failure message') + it('when a job fails', async () => { + mockEsClient.search.mockResponse( + getHits({ + jobtype: 'unencodedJobType', + status: 'failed', + output: { content: 'job failure message' }, + payload: { title: 'failing job!' }, + }) ); - }); + registerJobInfoRoutesPublic(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8') + .then(({ body }) => + expect(body.message).toEqual('Reporting generation failed: job failure message') + ); + }); - describe('successful downloads', () => { it('when a known job-type is complete', async () => { mockEsClient.search.mockResponseOnce(getCompleteHits()); registerJobInfoRoutesPublic(core); @@ -292,4 +292,28 @@ describe(`GET ${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { }); }); }); + + describe('delete report', () => { + it('handles content stream errors', async () => { + stream = new Readable({ + read() { + this.push('test'); + this.push(null); + }, + }) as typeof stream; + stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => { + callback(new Error('An error occurred in ending the content stream')); + }); + + (getContentStream as jest.MockedFunction).mockResolvedValue(stream); + mockEsClient.search.mockResponseOnce(getCompleteHits()); + registerJobInfoRoutesPublic(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .delete('/api/reporting/jobs/delete/denk') + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8'); + }); + }); });