diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts index 100555e9ead4cb..31c873554ee77c 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts @@ -84,7 +84,7 @@ describe('EphemeralTaskLifecycle', () => { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index 7e626c58538203..92d97eea7c6b20 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -83,7 +83,7 @@ describe('managed configuration', () => { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, @@ -205,7 +205,7 @@ describe('managed configuration', () => { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, diff --git a/x-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts b/x-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts new file mode 100644 index 00000000000000..69bf717b95fc6c --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/removed_types.test.ts @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { v4 as uuidV4 } from 'uuid'; +import { ElasticsearchClient } from '@kbn/core/server'; +import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin'; +import { injectTask, retry, setupTestServers } from './lib'; +import { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server'; +import { ConcreteTaskInstance, TaskStatus } from '../task'; +import { CreateWorkloadAggregatorOpts } from '../monitoring/workload_statistics'; + +const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start'); + +const { createWorkloadAggregator: createWorkloadAggregatorMock } = jest.requireMock( + '../monitoring/workload_statistics' +); +jest.mock('../monitoring/workload_statistics', () => { + const actual = jest.requireActual('../monitoring/workload_statistics'); + return { + ...actual, + createWorkloadAggregator: jest.fn().mockImplementation((opts) => { + return new actual.createWorkloadAggregator(opts); + }), + }; +}); + +describe('unrecognized task types', () => { + let esServer: TestElasticsearchUtils; + let kibanaServer: TestKibanaUtils; + let taskManagerPlugin: TaskManagerStartContract; + let createWorkloadAggregatorOpts: CreateWorkloadAggregatorOpts; + + const taskIdsToRemove: string[] = []; + + beforeAll(async () => { + const setupResult = await setupTestServers({ + xpack: { + task_manager: { + monitored_aggregated_stats_refresh_rate: 5000, + }, + }, + }); + esServer = setupResult.esServer; + kibanaServer = setupResult.kibanaServer; + + expect(taskManagerStartSpy).toHaveBeenCalledTimes(1); + taskManagerPlugin = taskManagerStartSpy.mock.results[0].value; + + expect(createWorkloadAggregatorMock).toHaveBeenCalledTimes(1); + createWorkloadAggregatorOpts = createWorkloadAggregatorMock.mock.calls[0][0]; + }); + + afterAll(async () => { + if (kibanaServer) { + await kibanaServer.stop(); + } + if (esServer) { + await esServer.stop(); + } + }); + + beforeEach(async () => { + jest.clearAllMocks(); + }); + + afterEach(async () => { + while (taskIdsToRemove.length > 0) { + const id = taskIdsToRemove.pop(); + await taskManagerPlugin.removeIfExists(id!); + } + }); + + test('should be no workload aggregator errors when there are removed task types', async () => { + const errorLogSpy = jest.spyOn(createWorkloadAggregatorOpts.logger, 'error'); + const removeTypeId = uuidV4(); + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: removeTypeId, + taskType: 'sampleTaskRemovedType', + params: {}, + state: { foo: 'test' }, + stateVersion: 1, + runAt: new Date(), + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + const notRegisteredTypeId = uuidV4(); + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: notRegisteredTypeId, + taskType: 'sampleTaskNotRegisteredType', + params: {}, + state: { foo: 'test' }, + stateVersion: 1, + runAt: new Date(), + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + + taskIdsToRemove.push(removeTypeId); + taskIdsToRemove.push(notRegisteredTypeId); + + await retry(async () => { + const task = await getTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser); + expect(task?._source?.task?.status).toBe('unrecognized'); + }); + + // monitored_aggregated_stats_refresh_rate is set to the minimum of 5 seconds + // so we want to wait that long to let it refresh + await new Promise((r) => setTimeout(r, 5100)); + + expect(errorLogSpy).not.toHaveBeenCalled(); + }); +}); + +async function getTask(esClient: ElasticsearchClient) { + const response = await esClient.search<{ task: ConcreteTaskInstance }>({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + filter: [ + { + term: { + 'task.taskType': 'sampleTaskRemovedType', + }, + }, + ], + }, + }, + }, + }); + + return response.hits.hits[0]; +} diff --git a/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts b/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts index 3943e94bdb8b32..24e2f510f949c7 100644 --- a/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts +++ b/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts @@ -56,7 +56,7 @@ const config = { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, @@ -78,7 +78,7 @@ const getStatsWithTimestamp = ({ timestamp, value: { capacity: { config: 10, as_cost: 20, as_workers: 10 }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, monitored_stats_running_average_window: 50, diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts index a39568df5fdd21..7df06865d30ed5 100644 --- a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts @@ -436,7 +436,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth { status: HealthStatus.OK, value: { capacity: { config: 10, as_cost: 20, as_workers: 10 }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 3000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, diff --git a/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts index b39e1b31687310..6b768a9f4d4e97 100644 --- a/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts +++ b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts @@ -74,7 +74,7 @@ const config: TaskManagerConfig = { }, version_conflict_threshold: 80, worker_utilization_running_average_window: 5, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts index 9791ac805e500d..94b11171f9e040 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts @@ -950,7 +950,7 @@ function mockStats( timestamp: new Date().toISOString(), value: { capacity: { config: 10, as_cost: 20, as_workers: 10 }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 0, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index a169c9dad8fe5f..2be1930786fa8d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -52,7 +52,7 @@ describe('Configuration Statistics Aggregator', () => { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, @@ -75,7 +75,7 @@ describe('Configuration Statistics Aggregator', () => { as_workers: 10, as_cost: 20, }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 6000000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, @@ -94,7 +94,7 @@ describe('Configuration Statistics Aggregator', () => { as_workers: 8, as_cost: 16, }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 6000000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, @@ -113,7 +113,7 @@ describe('Configuration Statistics Aggregator', () => { as_workers: 8, as_cost: 16, }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 3000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index b62ca8e8169e91..92cca0bf9a4f90 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -117,7 +117,7 @@ type ScheduledIntervals = ScheduleDensityResult['histogram']['buckets'][0]; // Set an upper bound just in case a customer sets a really high refresh rate const MAX_SCHEDULE_DENSITY_BUCKETS = 50; -interface CreateWorkloadAggregatorOpts { +export interface CreateWorkloadAggregatorOpts { taskStore: TaskStore; elasticsearchAndSOAvailability$: Observable; refreshInterval: number; diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 96269f58158df1..a7d452f76d6e2a 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -83,7 +83,7 @@ const pluginInitializerContextParams = { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index e0b23c3a4d95e7..04c28dd11735d3 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -87,7 +87,7 @@ describe('TaskPollingLifecycle', () => { }, worker_utilization_running_average_window: 5, metrics_reset_interval: 3000, - claim_strategy: 'default', + claim_strategy: 'update_by_query', request_timeouts: { update_by_query: 1000, }, diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index 9c08c5b5fb4c42..e76d218911dc1f 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -824,7 +824,7 @@ function mockHealthStats(overrides = {}) { timestamp: new Date().toISOString(), value: { capacity: { config: 10, as_cost: 20, as_workers: 10 }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 3000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts index 067a32c8a486d6..db511677439ba9 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts @@ -175,7 +175,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth { status: HealthStatus.OK, value: { capacity: { config: 10, as_cost: 20, as_workers: 10 }, - claim_strategy: 'default', + claim_strategy: 'update_by_query', poll_interval: 3000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000,