Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager Perf Test] Sep 23 #193700

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ export const DEFAULT_TIMEOUT = '300s';
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
export const INDEX_AUTO_EXPAND_REPLICAS = '0-1';
/** ES rule of thumb: shards should be several GB to 10's of GB, so Kibana is unlikely to cross that limit */
export const INDEX_NUMBER_OF_SHARDS = 1;
export const INDEX_NUMBER_OF_SHARDS = 3;
/** Wait for all shards to be active before starting an operation */
export const WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE = 'all';
2 changes: 2 additions & 0 deletions packages/kbn-check-mappings-update-cli/current_fields.json
Original file line number Diff line number Diff line change
Expand Up @@ -1078,9 +1078,11 @@
],
"task": [
"attempts",
"claimAt",
"enabled",
"ownerId",
"partition",
"priority",
"retryAt",
"runAt",
"schedule",
Expand Down
6 changes: 6 additions & 0 deletions packages/kbn-check-mappings-update-cli/current_mappings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3556,6 +3556,9 @@
"attempts": {
"type": "integer"
},
"claimAt": {
"type": "date"
},
"enabled": {
"type": "boolean"
},
Expand All @@ -3565,6 +3568,9 @@
"partition": {
"type": "integer"
},
"priority": {
"type": "integer"
},
"retryAt": {
"type": "date"
},
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/alerting/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const rulesSchema = schema.object({
}),
enforce: schema.boolean({ defaultValue: false }), // if enforce is false, only warnings will be shown
}),
maxScheduledPerMinute: schema.number({ defaultValue: 32000, max: 32000, min: 0 }),
maxScheduledPerMinute: schema.number({ defaultValue: 1000000, max: 1000000, min: 0 }),
overwriteProducer: schema.maybe(
schema.oneOf([
schema.literal('observability'),
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/event_log/server/es/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function getIndexTemplate(esNames: EsNames) {
template: {
settings: {
hidden: true,
number_of_shards: 1,
number_of_shards: 3,
auto_expand_replicas: '0-1',
},
lifecycle: {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_UPDATE_BY_QUERY }),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_MGET }),
request_timeouts: requestTimeoutsConfig,
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task';
import {
ScriptBasedSortClause,
ScriptClause,
mustBeAllOf,
MustCondition,
MustNotCondition,
} from './query_clauses';
import { ScriptClause, mustBeAllOf, MustCondition, MustNotCondition } from './query_clauses';

export function tasksOfType(taskTypes: string[]): estypes.QueryDslQueryContainer {
return {
Expand Down Expand Up @@ -96,56 +90,43 @@ export const RunningOrClaimingTaskWithExpiredRetryAt: MustCondition = {
},
};

const SortByRunAtAndRetryAtScript: ScriptBasedSortClause = {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
},
},
const SortByRunAtAndRetryAtScript: estypes.Sort = {
'task.claimAt': { order: 'asc', missing: '_first' },
};
export const SortByRunAtAndRetryAt = SortByRunAtAndRetryAtScript as estypes.SortCombinations;

function getSortByPriority(definitions: TaskTypeDictionary): estypes.SortCombinations | undefined {
if (definitions.size() === 0) return;

return {
_script: {
type: 'number',
order: 'desc',
script: {
lang: 'painless',
// Use priority if explicitly specified in task definition, otherwise default to 50 (Normal)
// TODO: we could do this locally as well, but they may starve
source: `
String taskType = doc['task.taskType'].value;
if (params.priority_map.containsKey(taskType)) {
return params.priority_map[taskType];
} else {
return ${TaskPriority.Normal};
}
`,
params: {
priority_map: definitions
.getAllDefinitions()
.reduce<Record<string, TaskPriority>>((acc, taskDefinition) => {
if (taskDefinition.priority) {
acc[taskDefinition.type] = taskDefinition.priority;
}
return acc;
}, {}),
},
},
},
'task.priority': { order: 'desc', missing: '_first' },
// _script: {
// type: 'number',
// order: 'desc',
// script: {
// lang: 'painless',
// // Use priority if explicitly specified in task definition, otherwise default to 50 (Normal)
// // TODO: we could do this locally as well, but they may starve
// source: `
// String taskType = doc['task.taskType'].value;
// if (params.priority_map.containsKey(taskType)) {
// return params.priority_map[taskType];
// } else {
// return ${TaskPriority.Normal};
// }
// `,
// params: {
// priority_map: definitions
// .getAllDefinitions()
// .reduce<Record<string, TaskPriority>>((acc, taskDefinition) => {
// if (taskDefinition.priority) {
// acc[taskDefinition.type] = taskDefinition.priority;
// }
// return acc;
// }, {}),
// },
// },
// },
};
}

Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/saved_objects/mappings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ export const taskMappings: SavedObjectsTypeMappingDefinition = {
partition: {
type: 'integer',
},
priority: {
type: 'integer',
},
claimAt: {
type: 'date',
},
},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server';
import { taskSchemaV1, taskSchemaV2 } from '../schemas/task';
import { taskSchemaV1, taskSchemaV2, taskSchemaV3 } from '../schemas/task';

export const taskModelVersions: SavedObjectsModelVersionMap = {
'1': {
Expand Down Expand Up @@ -35,4 +35,19 @@ export const taskModelVersions: SavedObjectsModelVersionMap = {
create: taskSchemaV2,
},
},
'3': {
changes: [
{
type: 'mappings_addition',
addedMappings: {
priority: { type: 'integer' },
claimAt: { type: 'date' },
},
},
],
schemas: {
forwardCompatibility: taskSchemaV3.extends({}, { unknowns: 'ignore' }),
create: taskSchemaV3,
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ export const taskSchemaV1 = schema.object({
export const taskSchemaV2 = taskSchemaV1.extends({
partition: schema.maybe(schema.number()),
});

export const taskSchemaV3 = taskSchemaV2.extends({
priority: schema.maybe(schema.number()),
claimAt: schema.maybe(schema.string()),
});
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ export type SerializedConcreteTaskInstance = Omit<
retryAt: string | null;
runAt: string;
partition?: number;
priority?: number;
claimAt?: string;
};

export type PartialSerializedConcreteTaskInstance = Partial<SerializedConcreteTaskInstance> & {
Expand Down
26 changes: 14 additions & 12 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class TaskManagerRunner implements TaskRunner {
private onTaskEvent: (event: TaskRun | TaskMarkRunning | TaskManagerStat) => void;
private defaultMaxAttempts: number;
private uuid: string;
private readonly executionContext: ExecutionContextStart;
// private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private config: TaskManagerConfig;
private readonly taskValidator: TaskValidator;
Expand Down Expand Up @@ -201,7 +201,7 @@ export class TaskManagerRunner implements TaskRunner {
this.beforeMarkRunning = beforeMarkRunning;
this.onTaskEvent = onTaskEvent;
this.defaultMaxAttempts = defaultMaxAttempts;
this.executionContext = executionContext;
// this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.config = config;
Expand Down Expand Up @@ -374,16 +374,18 @@ export class TaskManagerRunner implements TaskRunner {
try {
this.task = definition.createTaskRunner(modifiedContext);

const ctx = {
type: 'task manager',
name: `run ${this.instance.task.taskType}`,
id: this.instance.task.id,
description: 'run task',
};

const result = await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
);
// const ctx = {
// type: 'task manager',
// name: `run ${this.instance.task.taskType}`,
// id: this.instance.task.id,
// description: 'run task',
// };

const result = undefined;
this.logger.info(`Running a noop task`);
// const result = await this.executionContext.withContext(ctx, () =>
// withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
// );

const validatedResult = this.validateResult(result);
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ describe('TaskStore', () => {
id: task.id,
type: 'task',
version: task.version,
attributes: taskInstanceToAttributes(task, task.id),
attributes: taskInstanceToAttributes(task, task.id, taskDefinitions),
},
],
{ refresh: false }
Expand Down Expand Up @@ -886,7 +886,7 @@ describe('TaskStore', () => {
id: task.id,
type: 'task',
version: task.version,
attributes: taskInstanceToAttributes(task, task.id),
attributes: taskInstanceToAttributes(task, task.id, taskDefinitions),
},
],
{ refresh: false }
Expand Down
25 changes: 17 additions & 8 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import {
SerializedConcreteTaskInstance,
PartialConcreteTaskInstance,
PartialSerializedConcreteTaskInstance,
TaskPriority,
TaskStatus,
} from './task';

import { TaskTypeDictionary } from './task_type_dictionary';
Expand Down Expand Up @@ -186,7 +188,7 @@ export class TaskStore {
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
'task',
taskInstanceToAttributes(validatedTaskInstance, id),
taskInstanceToAttributes(validatedTaskInstance, id, this.definitions),
{ id, refresh: false }
);
if (get(taskInstance, 'schedule.interval', null) == null) {
Expand Down Expand Up @@ -214,7 +216,7 @@ export class TaskStore {
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
return {
type: 'task',
attributes: taskInstanceToAttributes(validatedTaskInstance, id),
attributes: taskInstanceToAttributes(validatedTaskInstance, id, this.definitions),
id,
};
});
Expand Down Expand Up @@ -268,7 +270,7 @@ export class TaskStore {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const attributes = taskInstanceToAttributes(taskInstance, doc.id);
const attributes = taskInstanceToAttributes(taskInstance, doc.id, this.definitions);

let updatedSavedObject;
try {
Expand Down Expand Up @@ -313,7 +315,7 @@ export class TaskStore {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate,
});
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id));
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id, this.definitions));
return attrsById;
}, new Map());

Expand Down Expand Up @@ -759,19 +761,26 @@ export function correctVersionConflictsForContinuation(

export function taskInstanceToAttributes(
doc: TaskInstance,
id: string
id: string,
definitions: TaskTypeDictionary
): SerializedConcreteTaskInstance {
const taskTypeDef = definitions.get(doc.taskType);
const runAt = (doc.runAt || new Date()).toISOString();
const retryAt = (doc.retryAt && doc.retryAt.toISOString()) || null;
const status = (doc as ConcreteTaskInstance).status || 'idle';
return {
...omit(doc, 'id', 'version'),
retryAt,
runAt,
status,
params: JSON.stringify(doc.params || {}),
state: JSON.stringify(doc.state || {}),
attempts: (doc as ConcreteTaskInstance).attempts || 0,
scheduledAt: (doc.scheduledAt || new Date()).toISOString(),
startedAt: (doc.startedAt && doc.startedAt.toISOString()) || null,
retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null,
runAt: (doc.runAt || new Date()).toISOString(),
status: (doc as ConcreteTaskInstance).status || 'idle',
partition: doc.partition || murmurhash.v3(id) % MAX_PARTITIONS,
priority: taskTypeDef?.priority || TaskPriority.Normal,
claimAt: [TaskStatus.Claiming, TaskStatus.Running].includes(status) ? retryAt : runAt,
} as SerializedConcreteTaskInstance;
}

Expand Down