From abec231aee9fdca0d543cbe10537d8eaca90bdda Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Mon, 28 Sep 2020 14:50:10 +0100 Subject: [PATCH] added task runtime stats to health endpoint --- .../task_manager/server/config.test.ts | 1 + x-pack/plugins/task_manager/server/config.ts | 7 + .../task_manager/server/monitoring/index.ts | 27 ++- .../monitoring_stats_stream.test.ts | 155 ++++++++++++++ .../monitoring/monitoring_stats_stream.ts | 127 ++++++++++++ .../runtime_statistics_aggregator.ts | 10 +- .../monitoring/task_run_statistics.test.ts | 193 ++++++++++++++++++ .../server/monitoring/task_run_statistics.ts | 166 +++++++++++++++ .../monitoring/workload_statistics.test.ts | 2 +- .../server/monitoring/workload_statistics.ts | 24 ++- x-pack/plugins/task_manager/server/plugin.ts | 6 +- .../task_manager/server/routes/health.test.ts | 144 ++----------- .../task_manager/server/routes/health.ts | 97 +++------ .../task_manager/server/task_events.ts | 20 +- .../task_manager/server/task_manager.mock.ts | 14 +- .../task_manager/server/task_manager.test.ts | 1 + .../task_manager/server/task_manager.ts | 51 +++-- .../test_suites/task_manager/health_route.ts | 35 +++- 18 files changed, 837 insertions(+), 243 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts create mode 100644 x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts create mode 100644 x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts create mode 100644 x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 2eb132185ff703..f0c19376389916 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -16,6 +16,7 @@ describe('config validation', () => { "max_poll_inactivity_cycles": 10, "max_workers": 10, "monitored_aggregated_stats_refresh_rate": 60000, + "monitored_stats_running_average_window": 50, "poll_interval": 3000, "request_capacity": 1000, } diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index 1b79c17220f4e5..a530cb2d44f4cf 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -9,6 +9,7 @@ import { schema, TypeOf } from '@kbn/config-schema'; export const DEFAULT_MAX_WORKERS = 10; export const DEFAULT_POLL_INTERVAL = 3000; export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10; +export const DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW = 50; // Refresh "pull based" monitored stats at a default rate of once a minute export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000; @@ -57,6 +58,12 @@ export const configSchema = schema.object({ /* don't run monitored stat aggregations any faster than once every 5 seconds */ min: 5000, }), + /* The size of the running average window for monitored stats. */ + monitored_stats_running_average_window: schema.number({ + defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW, + max: 100, + min: 10, + }), }); export type TaskManagerConfig = TypeOf; diff --git a/x-pack/plugins/task_manager/server/monitoring/index.ts b/x-pack/plugins/task_manager/server/monitoring/index.ts index 347731752d8521..ef447d6ef0620d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/index.ts +++ b/x-pack/plugins/task_manager/server/monitoring/index.ts @@ -3,22 +3,29 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + import { Logger } from 'src/core/server'; +import { Observable } from 'rxjs'; import { TaskManager } from '../task_manager'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; -import { createWorkloadAggregator } from './workload_statistics'; import { TaskManagerConfig } from '../config'; +import { + MonitoringStats, + createAggregators, + createMonitoringStatsStream, +} from './monitoring_stats_stream'; -export { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; +export { + MonitoringStats, + RawMonitoringStats, + summarizeMonitoringStats, + createAggregators, + createMonitoringStatsStream, +} from './monitoring_stats_stream'; -export function createAggregatedStatsStream( +export function createMonitoringStats( taskManager: TaskManager, config: TaskManagerConfig, logger: Logger -): AggregatedStatProvider { - return createWorkloadAggregator( - taskManager, - config.monitored_aggregated_stats_refresh_rate, - logger - ); +): Observable { + return createMonitoringStatsStream(createAggregators(taskManager, config, logger), config); } diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts new file mode 100644 index 00000000000000..063947f2ecad7b --- /dev/null +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskManagerConfig } from '../config'; +import { of, Subject } from 'rxjs'; +import { take, bufferCount } from 'rxjs/operators'; +import { createMonitoringStatsStream, AggregatedStat } from './monitoring_stats_stream'; +import { JsonValue } from 'src/plugins/kibana_utils/common'; + +beforeEach(() => { + jest.resetAllMocks(); +}); + +describe('createMonitoringStatsStream', () => { + const configuration: TaskManagerConfig = { + enabled: true, + max_workers: 10, + index: 'foo', + max_attempts: 9, + poll_interval: 6000000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, + }; + + it('returns the initial config used to configure Task Manager', async () => { + return new Promise((resolve) => { + createMonitoringStatsStream(of(), configuration) + .pipe(take(1)) + .subscribe((firstValue) => { + expect(firstValue.stats).toMatchObject({ + configuration: { + value: { + max_workers: 10, + poll_interval: 6000000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, + }, + }, + }); + resolve(); + }); + }); + }); + + it('incrementally updates the stats returned by the endpoint', async () => { + const aggregatedStats$ = new Subject(); + + return new Promise((resolve) => { + createMonitoringStatsStream(aggregatedStats$, configuration) + .pipe(take(3), bufferCount(3)) + .subscribe(([initialValue, secondValue, thirdValue]) => { + expect(initialValue.stats).toMatchObject({ + lastUpdate: expect.any(String), + stats: { + configuration: { + value: { + max_workers: 10, + poll_interval: 6000000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, + }, + }, + }, + }); + + expect(secondValue).toMatchObject({ + lastUpdate: expect.any(String), + stats: { + newAggregatedStat: { + timestamp: expect.any(String), + value: { + some: { + complex: { + value: 123, + }, + }, + }, + }, + configuration: { + timestamp: expect.any(String), + value: { + max_workers: 10, + poll_interval: 6000000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, + }, + }, + }, + }); + + expect(thirdValue).toMatchObject({ + lastUpdate: expect.any(String), + stats: { + newAggregatedStat: { + timestamp: expect.any(String), + value: { + some: { + updated: { + value: 456, + }, + }, + }, + }, + configuration: { + timestamp: expect.any(String), + value: { + max_workers: 10, + poll_interval: 6000000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, + }, + }, + }, + }); + }); + + aggregatedStats$.next({ + key: 'newAggregatedStat', + value: { + some: { + complex: { + value: 123, + }, + }, + } as JsonValue, + }); + + aggregatedStats$.next({ + key: 'newAggregatedStat', + value: { + some: { + updated: { + value: 456, + }, + }, + } as JsonValue, + }); + + resolve(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts new file mode 100644 index 00000000000000..03fa889fb732d3 --- /dev/null +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { merge, of, Observable } from 'rxjs'; +import { map, scan } from 'rxjs/operators'; +import { set } from '@elastic/safer-lodash-set'; +import { pick } from 'lodash'; +import { Logger } from 'src/core/server'; +import { JsonObject } from 'src/plugins/kibana_utils/common'; +import { TaskManager } from '../task_manager'; +import { createWorkloadAggregator, WorkloadStat } from './workload_statistics'; +import { createTaskRunAggregator, summarizeTaskRunStat, TaskRunStat } from './task_run_statistics'; +import { TaskManagerConfig } from '../config'; +import { AggregatedStatProvider } from './runtime_statistics_aggregator'; + +export { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; + +const CONFIG_FIELDS_TO_EXPOSE = [ + 'max_workers', + 'poll_interval', + 'request_capacity', + 'max_poll_inactivity_cycles', + 'monitored_aggregated_stats_refresh_rate', + 'monitored_stats_running_average_window', +] as const; + +type ConfigStat = Pick; + +export interface MonitoringStats { + lastUpdate: string; + stats: { + configuration: { + timestamp: string; + value: ConfigStat; + }; + workload?: { + timestamp: string; + value: WorkloadStat; + }; + runtime?: { + timestamp: string; + value: TaskRunStat; + }; + }; +} + +interface MonitoredStat { + timestamp: string; + value: JsonObject; +} + +export interface RawMonitoringStats { + lastUpdate: string; + stats: Record; +} + +export function createAggregators( + taskManager: TaskManager, + config: TaskManagerConfig, + logger: Logger +): AggregatedStatProvider { + return merge( + createTaskRunAggregator(taskManager, config.monitored_stats_running_average_window, logger), + createWorkloadAggregator(taskManager, config.monitored_aggregated_stats_refresh_rate, logger) + ); +} + +export function createMonitoringStatsStream( + provider$: AggregatedStatProvider, + config: TaskManagerConfig +): Observable { + const initialStats = initializeStats(new Date().toISOString(), config); + return merge( + // emit the initial stats + of(initialStats), + // emit updated stats whenever a provider updates a specific key on the stats + provider$.pipe( + map(({ key, value }) => { + return { + value: { timestamp: new Date().toISOString(), value }, + key, + }; + }), + scan((monitoringStats: MonitoringStats, { key, value }) => { + // incrementally merge stats as they come in + set(monitoringStats.stats, key, value); + monitoringStats.lastUpdate = new Date().toISOString(); + return monitoringStats; + }, initialStats) + ) + ); +} + +export function summarizeMonitoringStats({ + lastUpdate, + stats: { runtime, ...otherStats }, +}: MonitoringStats): RawMonitoringStats { + return { + lastUpdate, + stats: { + ...((otherStats as unknown) as RawMonitoringStats['stats']), + ...(runtime + ? { + runtime: { + ...runtime, + value: summarizeTaskRunStat(runtime.value), + }, + } + : {}), + }, + }; +} + +const initializeStats = ( + initialisationTimestamp: string, + config: TaskManagerConfig +): MonitoringStats => ({ + lastUpdate: initialisationTimestamp, + stats: { + configuration: { + timestamp: initialisationTimestamp, + value: pick(config, ...CONFIG_FIELDS_TO_EXPOSE) as ConfigStat, + }, + }, +}); diff --git a/x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts b/x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts index f895bf2b02e6a7..bd2b3845f2526d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts +++ b/x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts @@ -4,11 +4,13 @@ * you may not use this file except in compliance with the Elastic License. */ import { Observable } from 'rxjs'; -import { JsonObject, JsonValue } from 'src/plugins/kibana_utils/common'; +import { JsonValue } from 'src/plugins/kibana_utils/common'; -export interface AggregatedStat { +export interface AggregatedStat { key: string; - value: JsonObject | JsonValue; + value: Stat; } -export type AggregatedStatProvider = Observable; +export type AggregatedStatProvider = Observable< + AggregatedStat +>; diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts new file mode 100644 index 00000000000000..365b8962146dce --- /dev/null +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -0,0 +1,193 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import uuid from 'uuid'; +import { Subject } from 'rxjs'; +import stats from 'stats-lite'; +import sinon from 'sinon'; +import { take, tap, bufferCount, startWith, map } from 'rxjs/operators'; + +import { ConcreteTaskInstance, TaskStatus } from '../task'; +import { asTaskRunEvent, asTaskPollingCycleEvent } from '../task_events'; +import { asOk } from '../lib/result_type'; +import { TaskLifecycleEvent } from '../task_manager'; +import { + createTaskRunAggregator, + summarizeTaskRunStat, + TaskRunStat, + SummarizedTaskRunStat, +} from './task_run_statistics'; +import { taskManagerMock } from '../task_manager.mock'; +import { mockLogger } from '../test_utils'; +import { AggregatedStat } from './runtime_statistics_aggregator'; +import { FillPoolResult } from '../lib/fill_pool'; + +describe('Task Run Statistics', () => { + let fakeTimer: sinon.SinonFakeTimers; + + beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); + }); + + afterAll(() => fakeTimer.restore()); + + test('returns a running average of task drift', async () => { + const runAtDrift = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const taskManager = taskManagerMock.create({ + events: new Subject().pipe( + startWith( + ...runAtDrift.map((drift) => mockTaskRunEvent({ runAt: runAtMillisecondsAgo(drift) })) + ) + ), + }); + + const runningAverageWindowSize = 5; + const taskRunAggregator = createTaskRunAggregator( + taskManager, + runningAverageWindowSize, + mockLogger() + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.drift).toMatchObject({ + mean: stats.mean(window), + median: stats.median(window), + mode: stats.mode(window), + }); + } + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeTaskRunStat(value), + })), + take(runAtDrift.length), + bufferCount(runAtDrift.length) + ) + .subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], runAtDrift.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], runAtDrift.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], runAtDrift.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], runAtDrift.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], runAtDrift.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], runAtDrift.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], runAtDrift.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], runAtDrift.slice(3, 8)); + resolve(); + }); + }); + }); + + test('returns polling stats', async () => { + const expectedTimestamp: string[] = []; + const taskManager = taskManagerMock.create({ + events: new Subject().pipe( + startWith( + asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)), + asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)), + asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)), + asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled)), + asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled)), + asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled)), + asTaskPollingCycleEvent(asOk(FillPoolResult.RanOutOfCapacity)), + asTaskPollingCycleEvent(asOk(FillPoolResult.RanOutOfCapacity)), + asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)), + asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)) + ) + ), + }); + + const runningAverageWindowSize = 5; + const taskRunAggregator = createTaskRunAggregator( + taskManager, + runningAverageWindowSize, + mockLogger() + ); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + map(({ key, value }: AggregatedStat) => ({ + key, + value: summarizeTaskRunStat(value), + })), + tap(() => { + expectedTimestamp.push(new Date().toISOString()); + // each event is a second after the previous one + fakeTimer.tick(1000); + }), + take(10), + bufferCount(10) + ) + .subscribe((taskStats: Array>) => { + expect(taskStats.map((taskStat) => taskStat.value.polling.lastSuccessfulPoll)).toEqual( + expectedTimestamp + ); + + /** + * At any given time we only keep track of the last X Polling Results + * In the tests this is ocnfiugured to a window size of 5 + */ + expect(taskStats.map((taskStat) => taskStat.value.polling.resultFrequency)).toEqual([ + // NoTasksClaimed + { NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 }, + // NoTasksClaimed, NoTasksClaimed, + { NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 }, + // NoTasksClaimed, NoTasksClaimed, NoTasksClaimed + { NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 }, + // NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled + { NoTasksClaimed: 75, RanOutOfCapacity: 0, PoolFilled: 25 }, + // NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled + { NoTasksClaimed: 60, RanOutOfCapacity: 0, PoolFilled: 40 }, + // NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled + { NoTasksClaimed: 40, RanOutOfCapacity: 0, PoolFilled: 60 }, + // NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity + { NoTasksClaimed: 20, RanOutOfCapacity: 20, PoolFilled: 60 }, + // PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity + { NoTasksClaimed: 0, RanOutOfCapacity: 40, PoolFilled: 60 }, + // PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed + { NoTasksClaimed: 20, RanOutOfCapacity: 40, PoolFilled: 40 }, + // PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed, NoTasksClaimed + { NoTasksClaimed: 40, RanOutOfCapacity: 40, PoolFilled: 20 }, + ]); + resolve(); + }); + }); + }); +}); + +function runAtMillisecondsAgo(ms: number): Date { + return new Date(Date.now() - ms); +} + +const mockTaskRunEvent = (overrides: Partial = {}) => { + const task = mockTaskInstance(overrides); + return asTaskRunEvent(task.id, asOk(task)); +}; + +const mockTaskInstance = (overrides: Partial = {}): ConcreteTaskInstance => ({ + id: uuid.v4(), + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: 'alerting:test', + params: { + alertId: '1', + }, + ownerId: null, + ...overrides, +}); diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts new file mode 100644 index 00000000000000..ca224fc28199bc --- /dev/null +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { Logger } from 'src/core/server'; +import { of, empty } from 'rxjs'; +import { filter, flatMap } from 'rxjs/operators'; +import { isUndefined, countBy, mapValues } from 'lodash'; +import stats from 'stats-lite'; +import { JsonObject } from 'src/plugins/kibana_utils/common'; +import { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; +import { TaskManager, TaskLifecycleEvent } from '../task_manager'; +import { isTaskRunEvent, isTaskPollingCycleEvent } from '../task_events'; +import { isOk } from '../lib/result_type'; +import { ConcreteTaskInstance } from '../task'; +import { FillPoolResult } from '../lib/fill_pool'; + +interface AveragedStat extends JsonObject { + mean: number; + median: number; + mode: number; +} + +interface FillPoolStat extends JsonObject { + lastSuccessfulPoll: string; + resultFrequency: FillPoolResult[]; +} + +export interface TaskRunStat extends JsonObject { + drift: number[]; + polling: FillPoolStat | Omit; +} + +interface FillPoolRawStat extends JsonObject { + lastSuccessfulPoll: string; + resultFrequency: { + [FillPoolResult.NoTasksClaimed]: number; + [FillPoolResult.RanOutOfCapacity]: number; + [FillPoolResult.PoolFilled]: number; + }; +} + +export interface SummarizedTaskRunStat extends JsonObject { + drift: AveragedStat; + polling: FillPoolRawStat | Omit; +} + +export function createTaskRunAggregator( + taskManager: TaskManager, + runningAverageWindowSize: number, + logger: Logger +): AggregatedStatProvider { + const runningStats: { + runtime: { + polling: { + lastSuccessfulPoll: (value?: string) => string | undefined; + resultFrequency: (value?: FillPoolResult) => FillPoolResult[]; + }; + drift: (value?: number) => number[]; + }; + } = { + runtime: { + polling: { + lastSuccessfulPoll: createLastValueStat(), + resultFrequency: createRunningAveragedStat(runningAverageWindowSize), + }, + drift: createRunningAveragedStat(runningAverageWindowSize), + }, + }; + return taskManager.events.pipe( + filter( + (taskEvent: TaskLifecycleEvent) => + (isTaskRunEvent(taskEvent) || isTaskPollingCycleEvent(taskEvent)) && + isOk(taskEvent.event) + ), + flatMap((taskEvent: TaskLifecycleEvent) => { + if (isTaskRunEvent(taskEvent) && isOk(taskEvent.event)) { + const task = taskEvent.event.value; + const now = Date.now(); + return of({ + key: 'runtime', + value: { + polling: { + lastSuccessfulPoll: runningStats.runtime.polling.lastSuccessfulPoll(), + resultFrequency: runningStats.runtime.polling.resultFrequency(), + }, + drift: runningStats.runtime.drift(now - task.runAt.getTime()), + }, + } as AggregatedStat); + } else if (isTaskPollingCycleEvent(taskEvent) && isOk(taskEvent.event)) { + return of({ + key: 'runtime', + value: { + polling: { + lastSuccessfulPoll: runningStats.runtime.polling.lastSuccessfulPoll( + new Date().toISOString() + ), + resultFrequency: runningStats.runtime.polling.resultFrequency(taskEvent.event.value), + }, + drift: runningStats.runtime.drift(), + }, + } as AggregatedStat); + } + return empty(); + }) + ); +} + +export function summarizeTaskRunStat({ + polling: { lastSuccessfulPoll, resultFrequency }, + drift, +}: TaskRunStat): SummarizedTaskRunStat { + return { + polling: { + ...(lastSuccessfulPoll ? { lastSuccessfulPoll } : {}), + resultFrequency: { + [FillPoolResult.NoTasksClaimed]: 0, + [FillPoolResult.RanOutOfCapacity]: 0, + [FillPoolResult.PoolFilled]: 0, + ...calculateFrequency(resultFrequency as FillPoolResult[]), + }, + }, + drift: calculateRunningAverage(drift), + }; +} + +function calculateRunningAverage(values: number[]): AveragedStat { + return { + mean: stats.mean(values), + median: stats.median(values), + mode: stats.mode(values), + }; +} + +function calculateFrequency(values: T[]): JsonObject { + return mapValues(countBy(values), (count) => Math.round((count * 100) / values.length)); +} + +function createLastValueStat() { + let lastValue: T; + return (value?: T) => { + if (isUndefined(value)) { + return lastValue; + } else { + lastValue = value; + return lastValue; + } + }; +} + +function createRunningAveragedStat(runningAverageWindowSize: number) { + const queue = new Array(); + return (value?: T) => { + if (isUndefined(value)) { + return queue; + } else { + if (queue.length === runningAverageWindowSize) { + queue.shift(); + } + queue.push(value); + return [...queue]; + } + }; +} diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index f85a6571899ec6..0bcf3abfc7607b 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -4,9 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ +import { first, take, bufferCount } from 'rxjs/operators'; import { createWorkloadAggregator } from './workload_statistics'; import { taskManagerMock } from '../task_manager.mock'; -import { first, take, bufferCount } from 'rxjs/operators'; import { AggregationResult } from '../queries/aggregation_clauses'; import { mockLogger } from '../test_utils'; diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index 6cb6be9797807e..669e6af16ea0e1 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { interval } from 'rxjs'; +import { timer } from 'rxjs'; import { concatMap, map, catchError } from 'rxjs/operators'; import { Logger } from 'src/core/server'; import { JsonObject } from 'src/plugins/kibana_utils/common'; @@ -18,12 +18,28 @@ import { } from '../queries/aggregation_clauses'; import { parseIntervalAsSecond } from '../lib/intervals'; +interface StatusStat extends JsonObject { + [status: string]: number; +} +interface TaskTypeStat extends JsonObject { + [taskType: string]: { + sum: number; + status: StatusStat; + }; +} + +export interface WorkloadStat extends JsonObject { + sum: number; + taskTypes: TaskTypeStat; + schedule: Array<[string, number]>; +} + export function createWorkloadAggregator( taskManager: TaskManager, refreshInterval: number, logger: Logger -): AggregatedStatProvider { - return interval(refreshInterval).pipe( +): AggregatedStatProvider { + return timer(0, refreshInterval).pipe( concatMap(() => taskManager.aggregate({ aggs: { @@ -47,7 +63,7 @@ export function createWorkloadAggregator( taskType: { buckets: taskTypes = [] } = {}, schedule: { buckets: schedules = [] } = {}, } = task; - const summary: JsonObject = { + const summary: WorkloadStat = { sum, taskTypes: mapValues( keyBy>( diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 3a4577db01b494..f53418aec05add 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -12,7 +12,7 @@ import { TaskManagerConfig } from './config'; import { Middleware } from './lib/middleware'; import { setupSavedObjects } from './saved_objects'; import { healthRoute } from './routes'; -import { createAggregatedStatsStream } from './monitoring'; +import { createMonitoringStats } from './monitoring'; export type TaskManagerSetupContract = Pick< TaskManager, @@ -53,8 +53,8 @@ export class TaskManagerPlugin const router = core.http.createRouter(); healthRoute( router, - config, - this.taskManager.then((tm) => createAggregatedStatsStream(tm, config, logger)), + this.taskManager.then((tm) => createMonitoringStats(tm, config, logger)), + logger, // if health is any more stale than the pollInterval (+1s buffer) consider the system unhealthy config.poll_interval + 1000 ); diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index 4fc7b9d6b352cd..1ea33794a27946 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -7,64 +7,39 @@ import { healthRoute } from './health'; import { httpServiceMock } from 'src/core/server/mocks'; import { mockHandlerArguments } from './_mock_handler_arguments'; -import { TaskManagerConfig } from '../config'; -import { of, Subject } from 'rxjs'; -import { get } from 'lodash'; -import { sleep } from '../test_utils'; -import { AggregatedStat } from '../monitoring'; - -beforeEach(() => { - jest.resetAllMocks(); -}); - -const configuration: TaskManagerConfig = { - enabled: true, - max_workers: 10, - index: 'foo', - max_attempts: 9, - poll_interval: 6000000, - max_poll_inactivity_cycles: 10, - request_capacity: 1000, - monitored_aggregated_stats_refresh_rate: 5000, -}; +import { of } from 'rxjs'; +import { sleep, mockLogger } from '../test_utils'; describe('healthRoute', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + it('registers the route', async () => { const router = httpServiceMock.createRouter(); - healthRoute(router, configuration, Promise.resolve(of()), 1000); + healthRoute(router, Promise.resolve(of()), mockLogger(), 1000); const [config] = router.get.mock.calls[0]; expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/_health"`); }); - it('returns the initial config used to configure Task Manager', async () => { + it('logs the Task Manager stats at a fixed interval', async () => { const router = httpServiceMock.createRouter(); + const logger = mockLogger(); - healthRoute(router, configuration, Promise.resolve(of()), 1000); + healthRoute(router, Promise.resolve(of()), logger, 1000); - const [, handler] = router.get.mock.calls[0]; + await sleep(1000); - const [context, req, res] = mockHandlerArguments({}, {}, ['ok', 'internalError']); - - expect(get(await handler(context, req, res), 'body.stats')).toMatchObject({ - configuration: { - value: { - max_workers: 10, - poll_interval: 6000000, - max_poll_inactivity_cycles: 10, - request_capacity: 1000, - monitored_aggregated_stats_refresh_rate: 5000, - }, - }, - }); + expect(logger.debug).toHaveBeenCalledWith(''); }); it('returns an error response if the stats are no longer fresh', async () => { const router = httpServiceMock.createRouter(); - healthRoute(router, configuration, Promise.resolve(of()), 1000); + healthRoute(router, Promise.resolve(of()), mockLogger(), 1000); const [, handler] = router.get.mock.calls[0]; @@ -85,6 +60,7 @@ describe('healthRoute', () => { poll_interval: 6000000, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, }, }, }, @@ -93,96 +69,4 @@ describe('healthRoute', () => { }, }); }); - - it('incrementally updates the stats returned by the endpoint', async () => { - const router = httpServiceMock.createRouter(); - - const aggregatedStats = Promise.resolve(new Subject()); - - healthRoute(router, configuration, Promise.resolve(aggregatedStats), 1000); - - const [, handler] = router.get.mock.calls[0]; - - const [context, req, res] = mockHandlerArguments({}, {}, ['ok', 'internalError']); - - return aggregatedStats.then(async (aggregatedStats$) => { - aggregatedStats$.next({ - key: 'newAggregatedStat', - value: { - some: { - complex: { - value: 123, - }, - }, - }, - }); - - expect(await handler(context, req, res)).toMatchObject({ - body: { - lastUpdate: expect.any(String), - stats: { - newAggregatedStat: { - timestamp: expect.any(String), - value: { - some: { - complex: { - value: 123, - }, - }, - }, - }, - configuration: { - timestamp: expect.any(String), - value: { - max_workers: 10, - poll_interval: 6000000, - max_poll_inactivity_cycles: 10, - request_capacity: 1000, - monitored_aggregated_stats_refresh_rate: 5000, - }, - }, - }, - }, - }); - - aggregatedStats$.next({ - key: 'newAggregatedStat', - value: { - some: { - updated: { - value: 456, - }, - }, - }, - }); - - expect(await handler(context, req, res)).toMatchObject({ - body: { - lastUpdate: expect.any(String), - stats: { - newAggregatedStat: { - timestamp: expect.any(String), - value: { - some: { - updated: { - value: 456, - }, - }, - }, - }, - configuration: { - timestamp: expect.any(String), - value: { - max_workers: 10, - poll_interval: 6000000, - max_poll_inactivity_cycles: 10, - request_capacity: 1000, - monitored_aggregated_stats_refresh_rate: 5000, - }, - }, - }, - }, - }); - }); - }); }); diff --git a/x-pack/plugins/task_manager/server/routes/health.ts b/x-pack/plugins/task_manager/server/routes/health.ts index cf73c931439184..e99c1298363a89 100644 --- a/x-pack/plugins/task_manager/server/routes/health.ts +++ b/x-pack/plugins/task_manager/server/routes/health.ts @@ -11,68 +11,23 @@ import { IKibanaResponse, KibanaResponseFactory, } from 'kibana/server'; -import { pick } from 'lodash'; -import { set } from '@elastic/safer-lodash-set'; -import { JsonObject } from 'src/plugins/kibana_utils/common'; -import { map } from 'rxjs/operators'; -import { TaskManagerConfig } from '../config'; -import { AggregatedStatProvider } from '../monitoring'; - -const CONFIG_FIELDS_TO_EXPOSE = [ - 'max_workers', - 'poll_interval', - 'request_capacity', - 'max_poll_inactivity_cycles', - 'monitored_aggregated_stats_refresh_rate', -]; - -interface MonitoredStat { - timestamp: string; - value: JsonObject; -} - -interface MonitoringStats { - lastUpdate: string; - stats: Record; -} +import { Logger } from 'src/core/server'; +import { Observable } from 'rxjs'; +import { take } from 'rxjs/operators'; +import { debounceTime } from 'rxjs/operators'; +import { MonitoringStats, RawMonitoringStats, summarizeMonitoringStats } from '../monitoring'; export function healthRoute( router: IRouter, - initialConfig: TaskManagerConfig, - aggregatedStats: Promise, + monitoringStats: Promise>, + logger: Logger, requiredFreshness: number ) { - const initialisationTimestamp = new Date().toISOString(); - const monitoringStats: MonitoringStats = { - lastUpdate: initialisationTimestamp, - stats: { - configuration: { - timestamp: initialisationTimestamp, - value: pick<{ - max_workers: number; - poll_interval: number; - request_capacity: number; - max_poll_inactivity_cycles: number; - monitored_aggregated_stats_refresh_rate: number; - }>(initialConfig, ...CONFIG_FIELDS_TO_EXPOSE) as JsonObject, - }, - }, - }; - - aggregatedStats.then((aggregatedStats$) => { - aggregatedStats$ - .pipe( - map(({ key, value }) => { - return { - value: { timestamp: new Date().toISOString(), value }, - key, - }; - }) - ) - .subscribe(({ key, value }) => { - set(monitoringStats.stats, key, value); - monitoringStats.lastUpdate = new Date().toISOString(); - }); + /* Log Task Manager stats as a Debug log line at a fixed interval */ + monitoringStats.then((monitoringStats$) => { + monitoringStats$ + .pipe(debounceTime(requiredFreshness)) + .subscribe((stats) => logger.debug(JSON.stringify(summarizeMonitoringStats(stats)))); }); router.get( @@ -85,24 +40,32 @@ export function healthRoute( req: KibanaRequest, res: KibanaResponseFactory ): Promise { - const lastUpdate = Date.parse(monitoringStats.lastUpdate); + const { lastUpdate, stats } = await getLatestStats(await monitoringStats); + const now = Date.now(); + const timestamp = new Date(now).toISOString(); /** * If the monitored stats aren't fresh, return an `500 internalError` with * the stats in the body of the api call. This makes it easier for monitoring * services to mark the service as broken */ - if (Date.now() - lastUpdate > requiredFreshness) { - return res.internalError({ - body: { - message: new Error('Task Manager monitored stats are out of date'), - attributes: monitoringStats, - }, - }); - } + // if (now - Date.parse(lastUpdate) > requiredFreshness) { + // return res.internalError({ + // body: { + // message: new Error('Task Manager monitored stats are out of date'), + // attributes: { lastUpdate, timestamp, stats }, + // }, + // }); + // } return res.ok({ - body: monitoringStats, + body: { lastUpdate, timestamp, stats }, }); } ); } + +async function getLatestStats(monitoringStats$: Observable) { + return new Promise((resolve) => + monitoringStats$.pipe(take(1)).subscribe((stats) => resolve(summarizeMonitoringStats(stats))) + ); +} diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index e1dd85f868cdd2..6dd0c1546733fc 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -9,16 +9,19 @@ import { Option } from 'fp-ts/lib/Option'; import { ConcreteTaskInstance } from './task'; import { Result, Err } from './lib/result_type'; +import { FillPoolResult } from './lib/fill_pool'; +import { PollingError } from './polling'; export enum TaskEventType { TASK_CLAIM = 'TASK_CLAIM', TASK_MARK_RUNNING = 'TASK_MARK_RUNNING', TASK_RUN = 'TASK_RUN', TASK_RUN_REQUEST = 'TASK_RUN_REQUEST', + TASK_POLLING_CYCLE = 'TASK_POLLING_CYCLE', } export interface TaskEvent { - id: string; + id?: string; type: TaskEventType; event: Result; } @@ -26,6 +29,7 @@ export type TaskMarkRunning = TaskEvent; export type TaskRun = TaskEvent; export type TaskClaim = TaskEvent>; export type TaskRunRequest = TaskEvent; +export type TaskPollingCycle = TaskEvent>; export function asTaskMarkRunningEvent( id: string, @@ -69,6 +73,15 @@ export function asTaskRunRequestEvent( }; } +export function asTaskPollingCycleEvent( + event: Result> +): TaskPollingCycle { + return { + type: TaskEventType.TASK_POLLING_CYCLE, + event, + }; +} + export function isTaskMarkRunningEvent( taskEvent: TaskEvent ): taskEvent is TaskMarkRunning { @@ -85,3 +98,8 @@ export function isTaskRunRequestEvent( ): taskEvent is TaskRunRequest { return taskEvent.type === TaskEventType.TASK_RUN_REQUEST; } +export function isTaskPollingCycleEvent( + taskEvent: TaskEvent +): taskEvent is TaskPollingCycle { + return taskEvent.type === TaskEventType.TASK_POLLING_CYCLE; +} diff --git a/x-pack/plugins/task_manager/server/task_manager.mock.ts b/x-pack/plugins/task_manager/server/task_manager.mock.ts index e5325274024d8f..edd56b63e4800f 100644 --- a/x-pack/plugins/task_manager/server/task_manager.mock.ts +++ b/x-pack/plugins/task_manager/server/task_manager.mock.ts @@ -4,9 +4,16 @@ * you may not use this file except in compliance with the Elastic License. */ -import { TaskManager } from './task_manager'; +import { TaskManager, TaskLifecycleEvent } from './task_manager'; +import { of, Observable } from 'rxjs'; -const createTaskManagerMock = (isStarted: boolean = true) => { +const createTaskManagerMock = ({ + isStarted = true, + events = of(), +}: { + isStarted?: boolean; + events?: Observable; +} = {}) => { return ({ registerTaskDefinitions: jest.fn(), addMiddleware: jest.fn(), @@ -21,6 +28,9 @@ const createTaskManagerMock = (isStarted: boolean = true) => { get isStarted() { return isStarted; }, + get events() { + return events; + }, stop: jest.fn(), } as unknown) as jest.Mocked; }; diff --git a/x-pack/plugins/task_manager/server/task_manager.test.ts b/x-pack/plugins/task_manager/server/task_manager.test.ts index 017540a2dcc55c..decd7291bc0c8d 100644 --- a/x-pack/plugins/task_manager/server/task_manager.test.ts +++ b/x-pack/plugins/task_manager/server/task_manager.test.ts @@ -42,6 +42,7 @@ describe('TaskManager', () => { poll_interval: 6000000, max_poll_inactivity_cycles: 10, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, request_capacity: 1000, }; const taskManagerOpts = { diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 44e409a2aec370..7fcf496e0d119e 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -5,7 +5,7 @@ */ import { Logger } from 'src/core/server'; import { Subject, Observable, Subscription } from 'rxjs'; -import { filter } from 'rxjs/operators'; +import { filter, tap } from 'rxjs/operators'; import { performance } from 'perf_hooks'; @@ -25,10 +25,12 @@ import { TaskRun, TaskClaim, TaskRunRequest, + TaskPollingCycle, isTaskRunEvent, isTaskClaimEvent, isTaskRunRequestEvent, asTaskRunRequestEvent, + asTaskPollingCycleEvent, } from './task_events'; import { fillPool, FillPoolResult } from './lib/fill_pool'; import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; @@ -52,7 +54,7 @@ import { PollingErrorType, createObservableMonitor, } from './polling'; -import { TaskPool } from './task_pool'; +import { TaskPool, TaskPoolRunResult } from './task_pool'; import { TaskManagerRunner, TaskRunner } from './task_runner'; import { FetchResult, @@ -82,7 +84,12 @@ interface RunNowResult { id: string; } -export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRunRequest; +export type TaskLifecycleEvent = + | TaskMarkRunning + | TaskRun + | TaskClaim + | TaskRunRequest + | TaskPollingCycle; /* * The TaskManager is the public interface into the task manager system. This glues together @@ -195,6 +202,10 @@ export class TaskManager { ); } + public get events(): Observable { + return this.events$; + } + private emitEvent = (event: TaskLifecycleEvent) => { this.events$.next(event); }; @@ -245,17 +256,23 @@ export class TaskManager { this.startQueue.forEach((fn) => fn()); this.startQueue = []; - this.pollingSubscription = this.poller$.subscribe( - mapErr((error: PollingError) => { - if (error.type === PollingErrorType.RequestCapacityReached) { - pipe( - error.data, - mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) - ); - } - this.logger.error(error.message); - }) - ); + this.pollingSubscription = this.poller$ + .pipe( + tap( + mapErr((error: PollingError) => { + if (error.type === PollingErrorType.RequestCapacityReached) { + pipe( + error.data, + mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) + ); + } + this.logger.error(error.message); + }) + ) + ) + .subscribe((event: Result>) => { + this.emitEvent(asTaskPollingCycleEvent(event)); + }); } } @@ -522,13 +539,13 @@ export async function awaitTaskRunResult( ); }, taskEvent.event); } else { - either>( + either>( taskEvent.event, - (taskInstance: ConcreteTaskInstance) => { + (taskInstance: ConcreteTaskInstance | FillPoolResult) => { // resolve if the task has run sucessfully if (isTaskRunEvent(taskEvent)) { subscription.unsubscribe(); - resolve({ id: taskInstance.id }); + resolve({ id: (taskInstance as ConcreteTaskInstance).id }); } }, async (error: Error | Option) => { diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index c3c15c7ba4810b..3c792966e86818 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -22,6 +22,16 @@ interface MonitoringStats { timestamp: string; value: Record; }; + runtime: { + timestamp: string; + value: { + drift: Record; + polling: { + lastSuccessfulPoll: string; + resultFrequency: Record; + }; + }; + }; }; } @@ -59,18 +69,19 @@ export default function ({ getService }: FtrProviderContext) { poll_interval: 3000, max_poll_inactivity_cycles: 10, monitored_aggregated_stats_refresh_rate: monitoredAggregatedStatsRefreshRate, + monitored_stats_running_average_window: 50, request_capacity: 1000, max_workers: 10, }); }); it('should return the task manager workload', async () => { - const workload = (await getHealth()).stats.workload; + const { workload } = (await getHealth()).stats; const sumSampleTaskInWorkload = (workload.value.taskTypes as { sampleTask?: { sum: number }; }).sampleTask?.sum ?? 0; - const schedulesWorkload = (mapValues( + const scheduledWorkload = (mapValues( keyBy(workload.value.schedule as Array<[string, number]>, ([interval, count]) => interval), ([, count]) => count ) as unknown) as { '37m': number | undefined; '37s': number | undefined }; @@ -105,9 +116,25 @@ export default function ({ getService }: FtrProviderContext) { '37m': number; '37s': number; }; - expect(schedulesWorkloadAfterScheduling['37s']).to.eql(schedulesWorkload['37s'] ?? 0 + 1); - expect(schedulesWorkloadAfterScheduling['37m']).to.eql(schedulesWorkload['37m'] ?? 0 + 1); + expect(schedulesWorkloadAfterScheduling['37s']).to.eql(1 + (scheduledWorkload['37s'] ?? 0)); + expect(schedulesWorkloadAfterScheduling['37m']).to.eql(1 + (scheduledWorkload['37m'] ?? 0)); }); }); + + it('should return the task manager runtime stats', async () => { + const { + runtime: { + value: { drift, polling }, + }, + } = (await getHealth()).stats; + + expect(isNaN(Date.parse(polling.lastSuccessfulPoll as string))).to.eql(false); + expect(typeof polling.resultFrequency.NoTasksClaimed).to.eql('number'); + expect(typeof polling.resultFrequency.RanOutOfCapacity).to.eql('number'); + expect(typeof polling.resultFrequency.PoolFilled).to.eql('number'); + + expect(typeof drift.mean).to.eql('number'); + expect(typeof drift.median).to.eql('number'); + }); }); }