Skip to content

Commit

Permalink
added task runtime stats to health endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Sep 28, 2020
1 parent e1ee967 commit abec231
Show file tree
Hide file tree
Showing 18 changed files with 837 additions and 243 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ describe('config validation', () => {
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"poll_interval": 3000,
"request_capacity": 1000,
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
export const DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW = 50;

// Refresh "pull based" monitored stats at a default rate of once a minute
export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000;
Expand Down Expand Up @@ -57,6 +58,12 @@ export const configSchema = schema.object({
/* don't run monitored stat aggregations any faster than once every 5 seconds */
min: 5000,
}),
/* The size of the running average window for monitored stats. */
monitored_stats_running_average_window: schema.number({
defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW,
max: 100,
min: 10,
}),
});

export type TaskManagerConfig = TypeOf<typeof configSchema>;
27 changes: 17 additions & 10 deletions x-pack/plugins/task_manager/server/monitoring/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,29 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Logger } from 'src/core/server';
import { Observable } from 'rxjs';
import { TaskManager } from '../task_manager';
import { AggregatedStatProvider } from './runtime_statistics_aggregator';
import { createWorkloadAggregator } from './workload_statistics';
import { TaskManagerConfig } from '../config';
import {
MonitoringStats,
createAggregators,
createMonitoringStatsStream,
} from './monitoring_stats_stream';

export { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator';
export {
MonitoringStats,
RawMonitoringStats,
summarizeMonitoringStats,
createAggregators,
createMonitoringStatsStream,
} from './monitoring_stats_stream';

export function createAggregatedStatsStream(
export function createMonitoringStats(
taskManager: TaskManager,
config: TaskManagerConfig,
logger: Logger
): AggregatedStatProvider {
return createWorkloadAggregator(
taskManager,
config.monitored_aggregated_stats_refresh_rate,
logger
);
): Observable<MonitoringStats> {
return createMonitoringStatsStream(createAggregators(taskManager, config, logger), config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskManagerConfig } from '../config';
import { of, Subject } from 'rxjs';
import { take, bufferCount } from 'rxjs/operators';
import { createMonitoringStatsStream, AggregatedStat } from './monitoring_stats_stream';
import { JsonValue } from 'src/plugins/kibana_utils/common';

beforeEach(() => {
jest.resetAllMocks();
});

describe('createMonitoringStatsStream', () => {
const configuration: TaskManagerConfig = {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
};

it('returns the initial config used to configure Task Manager', async () => {
return new Promise((resolve) => {
createMonitoringStatsStream(of(), configuration)
.pipe(take(1))
.subscribe((firstValue) => {
expect(firstValue.stats).toMatchObject({
configuration: {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
},
},
});
resolve();
});
});
});

it('incrementally updates the stats returned by the endpoint', async () => {
const aggregatedStats$ = new Subject<AggregatedStat>();

return new Promise((resolve) => {
createMonitoringStatsStream(aggregatedStats$, configuration)
.pipe(take(3), bufferCount(3))
.subscribe(([initialValue, secondValue, thirdValue]) => {
expect(initialValue.stats).toMatchObject({
lastUpdate: expect.any(String),
stats: {
configuration: {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
},
},
},
});

expect(secondValue).toMatchObject({
lastUpdate: expect.any(String),
stats: {
newAggregatedStat: {
timestamp: expect.any(String),
value: {
some: {
complex: {
value: 123,
},
},
},
},
configuration: {
timestamp: expect.any(String),
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
},
},
},
});

expect(thirdValue).toMatchObject({
lastUpdate: expect.any(String),
stats: {
newAggregatedStat: {
timestamp: expect.any(String),
value: {
some: {
updated: {
value: 456,
},
},
},
},
configuration: {
timestamp: expect.any(String),
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
},
},
},
});
});

aggregatedStats$.next({
key: 'newAggregatedStat',
value: {
some: {
complex: {
value: 123,
},
},
} as JsonValue,
});

aggregatedStats$.next({
key: 'newAggregatedStat',
value: {
some: {
updated: {
value: 456,
},
},
} as JsonValue,
});

resolve();
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { merge, of, Observable } from 'rxjs';
import { map, scan } from 'rxjs/operators';
import { set } from '@elastic/safer-lodash-set';
import { pick } from 'lodash';
import { Logger } from 'src/core/server';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { TaskManager } from '../task_manager';
import { createWorkloadAggregator, WorkloadStat } from './workload_statistics';
import { createTaskRunAggregator, summarizeTaskRunStat, TaskRunStat } from './task_run_statistics';
import { TaskManagerConfig } from '../config';
import { AggregatedStatProvider } from './runtime_statistics_aggregator';

export { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator';

const CONFIG_FIELDS_TO_EXPOSE = [
'max_workers',
'poll_interval',
'request_capacity',
'max_poll_inactivity_cycles',
'monitored_aggregated_stats_refresh_rate',
'monitored_stats_running_average_window',
] as const;

type ConfigStat = Pick<TaskManagerConfig, typeof CONFIG_FIELDS_TO_EXPOSE[number]>;

export interface MonitoringStats {
lastUpdate: string;
stats: {
configuration: {
timestamp: string;
value: ConfigStat;
};
workload?: {
timestamp: string;
value: WorkloadStat;
};
runtime?: {
timestamp: string;
value: TaskRunStat;
};
};
}

interface MonitoredStat {
timestamp: string;
value: JsonObject;
}

export interface RawMonitoringStats {
lastUpdate: string;
stats: Record<string, MonitoredStat>;
}

export function createAggregators(
taskManager: TaskManager,
config: TaskManagerConfig,
logger: Logger
): AggregatedStatProvider {
return merge(
createTaskRunAggregator(taskManager, config.monitored_stats_running_average_window, logger),
createWorkloadAggregator(taskManager, config.monitored_aggregated_stats_refresh_rate, logger)
);
}

export function createMonitoringStatsStream(
provider$: AggregatedStatProvider,
config: TaskManagerConfig
): Observable<MonitoringStats> {
const initialStats = initializeStats(new Date().toISOString(), config);
return merge(
// emit the initial stats
of(initialStats),
// emit updated stats whenever a provider updates a specific key on the stats
provider$.pipe(
map(({ key, value }) => {
return {
value: { timestamp: new Date().toISOString(), value },
key,
};
}),
scan((monitoringStats: MonitoringStats, { key, value }) => {
// incrementally merge stats as they come in
set(monitoringStats.stats, key, value);
monitoringStats.lastUpdate = new Date().toISOString();
return monitoringStats;
}, initialStats)
)
);
}

export function summarizeMonitoringStats({
lastUpdate,
stats: { runtime, ...otherStats },
}: MonitoringStats): RawMonitoringStats {
return {
lastUpdate,
stats: {
...((otherStats as unknown) as RawMonitoringStats['stats']),
...(runtime
? {
runtime: {
...runtime,
value: summarizeTaskRunStat(runtime.value),
},
}
: {}),
},
};
}

const initializeStats = (
initialisationTimestamp: string,
config: TaskManagerConfig
): MonitoringStats => ({
lastUpdate: initialisationTimestamp,
stats: {
configuration: {
timestamp: initialisationTimestamp,
value: pick(config, ...CONFIG_FIELDS_TO_EXPOSE) as ConfigStat,
},
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Observable } from 'rxjs';
import { JsonObject, JsonValue } from 'src/plugins/kibana_utils/common';
import { JsonValue } from 'src/plugins/kibana_utils/common';

export interface AggregatedStat {
export interface AggregatedStat<Stat = JsonValue> {
key: string;
value: JsonObject | JsonValue;
value: Stat;
}

export type AggregatedStatProvider = Observable<AggregatedStat>;
export type AggregatedStatProvider<Stat extends JsonValue = JsonValue> = Observable<
AggregatedStat<Stat>
>;
Loading

0 comments on commit abec231

Please sign in to comment.