Skip to content

Commit

Permalink
[Response Ops][Task Manager] Adding integration test to ensure no `Wo…
Browse files Browse the repository at this point in the history
…rkloadAggregator` errors when there are unrecognized task types. (#193479)

Fixes elastic/kibana-team#1036

## Summary

Adding integration test as RCA action for incident where unrecognized
task types was causing issues generating the workload portion of the
task manager health report.

## To verify

Add this line to your code to that will throw an error when there are
unrecognized task types when generating the health report

```
--- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts
+++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts
@@ -128,6 +128,7 @@ export class TaskTypeDictionary {
   }

   public get(type: string): TaskDefinition | undefined {
+    this.ensureHas(type);
     return this.definitions.get(type);
   }
```

Run the integration test `node scripts/jest_integration.js
x-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts`
and see that it fails because a `WorkloadAggregator` error is logged.

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
(cherry picked from commit 01eae15)
  • Loading branch information
ymao1 committed Sep 25, 2024
1 parent c7b153c commit 8c12a26
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ describe('EphemeralTaskLifecycle', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { v4 as uuidV4 } from 'uuid';
import { ElasticsearchClient } from '@kbn/core/server';
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { injectTask, retry, setupTestServers } from './lib';
import { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { CreateWorkloadAggregatorOpts } from '../monitoring/workload_statistics';

const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');

const { createWorkloadAggregator: createWorkloadAggregatorMock } = jest.requireMock(
'../monitoring/workload_statistics'
);
jest.mock('../monitoring/workload_statistics', () => {
const actual = jest.requireActual('../monitoring/workload_statistics');
return {
...actual,
createWorkloadAggregator: jest.fn().mockImplementation((opts) => {
return new actual.createWorkloadAggregator(opts);
}),
};
});

describe('unrecognized task types', () => {
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let createWorkloadAggregatorOpts: CreateWorkloadAggregatorOpts;

const taskIdsToRemove: string[] = [];

beforeAll(async () => {
const setupResult = await setupTestServers({
xpack: {
task_manager: {
monitored_aggregated_stats_refresh_rate: 5000,
},
},
});
esServer = setupResult.esServer;
kibanaServer = setupResult.kibanaServer;

expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;

expect(createWorkloadAggregatorMock).toHaveBeenCalledTimes(1);
createWorkloadAggregatorOpts = createWorkloadAggregatorMock.mock.calls[0][0];
});

afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});

beforeEach(async () => {
jest.clearAllMocks();
});

afterEach(async () => {
while (taskIdsToRemove.length > 0) {
const id = taskIdsToRemove.pop();
await taskManagerPlugin.removeIfExists(id!);
}
});

test('should be no workload aggregator errors when there are removed task types', async () => {
const errorLogSpy = jest.spyOn(createWorkloadAggregatorOpts.logger, 'error');
const removeTypeId = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: removeTypeId,
taskType: 'sampleTaskRemovedType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
const notRegisteredTypeId = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: notRegisteredTypeId,
taskType: 'sampleTaskNotRegisteredType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

taskIdsToRemove.push(removeTypeId);
taskIdsToRemove.push(notRegisteredTypeId);

await retry(async () => {
const task = await getTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser);
expect(task?._source?.task?.status).toBe('unrecognized');
});

// monitored_aggregated_stats_refresh_rate is set to the minimum of 5 seconds
// so we want to wait that long to let it refresh
await new Promise((r) => setTimeout(r, 5100));

expect(errorLogSpy).not.toHaveBeenCalled();
});
});

async function getTask(esClient: ElasticsearchClient) {
const response = await esClient.search<{ task: ConcreteTaskInstance }>({
index: '.kibana_task_manager',
body: {
query: {
bool: {
filter: [
{
term: {
'task.taskType': 'sampleTaskRemovedType',
},
},
],
},
},
},
});

return response.hits.hits[0];
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const config = {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand All @@ -78,7 +78,7 @@ const getStatsWithTimestamp = ({
timestamp,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
status: HealthStatus.OK,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const config: TaskManagerConfig = {
},
version_conflict_threshold: 80,
worker_utilization_running_average_window: 5,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ function mockStats(
timestamp: new Date().toISOString(),
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 0,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('Configuration Statistics Aggregator', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand All @@ -75,7 +75,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 6000000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand All @@ -94,7 +94,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 8,
as_cost: 16,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 6000000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand All @@ -113,7 +113,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 8,
as_cost: 16,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type ScheduledIntervals = ScheduleDensityResult['histogram']['buckets'][0];
// Set an upper bound just in case a customer sets a really high refresh rate
const MAX_SCHEDULE_DENSITY_BUCKETS = 50;

interface CreateWorkloadAggregatorOpts {
export interface CreateWorkloadAggregatorOpts {
taskStore: TaskStore;
elasticsearchAndSOAvailability$: Observable<boolean>;
refreshInterval: number;
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const pluginInitializerContextParams = {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('TaskPollingLifecycle', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/routes/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ function mockHealthStats(overrides = {}) {
timestamp: new Date().toISOString(),
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
status: HealthStatus.OK,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down

0 comments on commit 8c12a26

Please sign in to comment.