Skip to content

Commit

Permalink
[expressions] caching (#180440)
Browse files Browse the repository at this point in the history
  • Loading branch information
ppisljar authored May 20, 2024
1 parent 953706a commit 79b8bab
Show file tree
Hide file tree
Showing 38 changed files with 339 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/plugins/data/common/search/expressions/eql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export const getEqlFn = ({
name,
type: 'eql_raw_response',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.search.eql.help', {
defaultMessage: 'Run Elasticsearch request',
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export const getEsaggsMeta: () => Omit<EsaggsExpressionFunctionDefinition, 'fn'>
name,
type: 'datatable',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.functions.esaggs.help', {
defaultMessage: 'Run AggConfig aggregation',
}),
Expand Down
1 change: 1 addition & 0 deletions src/plugins/data/common/search/expressions/esdsl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const getEsdslFn = ({
name,
type: 'es_raw_response',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.search.esdsl.help', {
defaultMessage: 'Run Elasticsearch request',
}),
Expand Down
1 change: 1 addition & 0 deletions src/plugins/data/common/search/expressions/esql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export const getEsqlFn = ({ getStartDependencies }: EsqlFnArguments) => {
name: 'esql',
type: 'datatable',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.search.esql.help', {
defaultMessage: 'Queries Elasticsearch using ES|QL.',
}),
Expand Down
1 change: 1 addition & 0 deletions src/plugins/data/common/search/expressions/essql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export const getEssqlFn = ({ getStartDependencies }: EssqlFnArguments) => {
name: 'essql',
type: 'datatable',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.search.essql.help', {
defaultMessage: 'Queries Elasticsearch using Elasticsearch SQL.',
}),
Expand Down
115 changes: 80 additions & 35 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
Subscription,
} from 'rxjs';
import { catchError, finalize, map, pluck, shareReplay, switchMap, tap } from 'rxjs';
import { now, AbortError } from '@kbn/kibana-utils-plugin/common';
import { now, AbortError, calculateObjectHash } from '@kbn/kibana-utils-plugin/common';
import { Adapters } from '@kbn/inspector-plugin/common';
import { Executor } from '../executor';
import { createExecutionContainer, ExecutionContainer } from './container';
Expand Down Expand Up @@ -55,6 +55,10 @@ type UnwrapReturnType<Function extends (...args: any[]) => unknown> =
? UnwrapObservable<ReturnType<Function>>
: Awaited<ReturnType<Function>>;

export interface FunctionCacheItem {
value: unknown;
time: number;
}
/**
* The result returned after an expression function execution.
*/
Expand All @@ -70,6 +74,8 @@ export interface ExecutionResult<Output> {
result: Output;
}

const maxCacheSize = 1000;

const createAbortErrorValue = () =>
createError({
message: 'The expression was aborted.',
Expand Down Expand Up @@ -235,6 +241,7 @@ export class Execution<
* @private
*/
private readonly childExecutions: Execution[] = [];
private cacheTimeout: number = 30000;

/**
* Contract is a public representation of `Execution` instances. Contract we
Expand All @@ -248,7 +255,11 @@ export class Execution<
return this.context.inspectorAdapters;
}

constructor(public readonly execution: ExecutionParams, private readonly logger?: Logger) {
constructor(
public readonly execution: ExecutionParams,
private readonly logger?: Logger,
private readonly functionCache: Map<string, FunctionCacheItem> = new Map()
) {
const { executor } = execution;

this.contract = new ExecutionContract<Input, Output, InspectorAdapters>(this);
Expand Down Expand Up @@ -278,6 +289,7 @@ export class Execution<
? () => execution.params.kibanaRequest!
: undefined,
variables: execution.params.variables || {},
allowCache: this.execution.params.allowCache,
types: executor.getTypes(),
abortSignal: this.abortController.signal,
inspectorAdapters,
Expand Down Expand Up @@ -454,42 +466,75 @@ export class Execution<
input: unknown,
args: Record<string, unknown>
): Observable<UnwrapReturnType<Fn['fn']>> {
return of(input).pipe(
map((currentInput) => this.cast(currentInput, fn.inputTypes)),
switchMap((normalizedInput) => of(fn.fn(normalizedInput, args, this.context))),
switchMap(
(fnResult) =>
(isObservable(fnResult)
? fnResult
: from(isPromise(fnResult) ? fnResult : [fnResult])) as Observable<
UnwrapReturnType<Fn['fn']>
>
),
map((output) => {
// Validate that the function returned the type it said it would.
// This isn't required, but it keeps function developers honest.
const returnType = getType(output);
const expectedType = fn.type;
if (expectedType && returnType !== expectedType) {
throw new Error(
`Function '${fn.name}' should return '${expectedType}',` +
` actually returned '${returnType}'`
);
}
let hash: string | undefined;
let lastValue: unknown;
let completionFlag = false;

return of(input)
.pipe(
map((currentInput) => this.cast(currentInput, fn.inputTypes)),
switchMap((normalizedInput) => {
if (fn.allowCache && this.context.allowCache) {
hash = calculateObjectHash([
fn.name,
normalizedInput,
args,
this.context.getSearchContext(),
]);
}
if (hash && this.functionCache.has(hash)) {
const cached = this.functionCache.get(hash);
if (cached && Date.now() - cached.time < this.cacheTimeout) {
return of(cached.value);
}
}
return of(fn.fn(normalizedInput, args, this.context));
}),
switchMap((fnResult) => {
return (
isObservable(fnResult) ? fnResult : from(isPromise(fnResult) ? fnResult : [fnResult])
) as Observable<UnwrapReturnType<Fn['fn']>>;
}),
map((output) => {
// Validate that the function returned the type it said it would.
// This isn't required, but it keeps function developers honest.
const returnType = getType(output);
const expectedType = fn.type;
if (expectedType && returnType !== expectedType) {
throw new Error(
`Function '${fn.name}' should return '${expectedType}',` +
` actually returned '${returnType}'`
);
}

// Validate the function output against the type definition's validate function.
const type = this.context.types[fn.type];
if (type && type.validate) {
try {
type.validate(output);
} catch (e) {
throw new Error(`Output of '${fn.name}' is not a valid type '${fn.type}': ${e}`);
// Validate the function output against the type definition's validate function.
const type = this.context.types[fn.type];
if (type && type.validate) {
try {
type.validate(output);
} catch (e) {
throw new Error(`Output of '${fn.name}' is not a valid type '${fn.type}': ${e}`);
}
}
}

return output;
})
);
lastValue = output;

return output;
}),
finalize(() => {
if (completionFlag && hash) {
while (this.functionCache.size >= maxCacheSize) {
this.functionCache.delete(this.functionCache.keys().next().value);
}
this.functionCache.set(hash, { value: lastValue, time: Date.now() });
}
})
)
.pipe(
tap({
complete: () => (completionFlag = true), // Set flag true only on successful completion
})
);
}

public cast<Type = unknown>(value: unknown, toTypeNames?: string[]): Type {
Expand Down
5 changes: 5 additions & 0 deletions src/plugins/expressions/common/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export interface ExecutionContext<InspectorAdapters extends Adapters = Adapters>
*/
types: Record<string, ExpressionType>;

/**
* Allow caching in the current execution.
*/
allowCache?: boolean;

/**
* Adds ability to abort current execution.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ describe('Executor mocked execution tests', () => {

expect(Execution).toHaveBeenCalledWith(
expect.objectContaining({ expression: 'foo bar="baz"' }),
undefined
undefined,
expect.anything()
);
});
});
Expand All @@ -40,7 +41,8 @@ describe('Executor mocked execution tests', () => {

expect(Execution).toHaveBeenCalledWith(
expect.not.objectContaining({ expression: expect.anything() }),
undefined
undefined,
expect.anything()
);
});
});
Expand Down
90 changes: 89 additions & 1 deletion src/plugins/expressions/common/executor/executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import { Executor } from './executor';
import * as expressionTypes from '../expression_types';
import * as expressionFunctions from '../expression_functions';
import { Execution } from '../execution';
import { Execution, FunctionCacheItem } from '../execution';
import { ExpressionAstFunction, parseExpression, formatExpression } from '../ast';
import { MigrateFunction } from '@kbn/kibana-utils-plugin/common/persistable_state';
import { SavedObjectReference } from '@kbn/core/types';
Expand Down Expand Up @@ -312,4 +312,92 @@ describe('Executor', () => {
});
});
});

describe('caching', () => {
const functionCache: Map<string, FunctionCacheItem> = new Map();
const fakeCacheEntry = { time: Date.now(), value: 'test' };
let executor: Executor;

beforeAll(() => {
executor = new Executor(undefined, undefined, functionCache);
executor.registerFunction(expressionFunctions.variable);
expressionFunctions.theme.allowCache = true;
executor.registerFunction(expressionFunctions.theme);
});

afterEach(() => {
functionCache.clear();
});

it('caches the result of function', async () => {
await executor.run('theme size default=12', null, { allowCache: true }).toPromise();
expect(functionCache.size).toEqual(1);
const entry = functionCache.keys().next().value;
functionCache.set(entry, fakeCacheEntry);
const result = await executor
.run('theme size default=12', null, { allowCache: true })
.toPromise();
expect(functionCache.size).toEqual(1);
expect(result?.result).toEqual(fakeCacheEntry.value);
});

it('doesnt cache if allowCache flag is false', async () => {
await executor.run('theme size default=12', null, { allowCache: true }).toPromise();
expect(functionCache.size).toEqual(1);
const entry = functionCache.keys().next().value;
functionCache.set(entry, fakeCacheEntry);
const result = await executor
.run('theme size default=12', null, { allowCache: false })
.toPromise();
expect(functionCache.size).toEqual(1);
expect(result?.result).not.toEqual(fakeCacheEntry.value);
});

it('doesnt cache results of functions that have allowCache property set to false', async () => {
await executor.run('var name="test"', null, { allowCache: true }).toPromise();
expect(functionCache.size).toEqual(0);
});

describe('doesnt use cached version', () => {
const cachedVersion = { time: Date.now(), value: 'value' };

beforeAll(async () => {
await executor.run('theme size default=12', null, { allowCache: true }).toPromise();
expect(functionCache.size).toEqual(1);
const entry: string = Object.keys(functionCache)[0];
functionCache.set(entry, cachedVersion);
});

it('input changed', async () => {
const result = await executor
.run(
'theme size default=12',
{
type: 'kibana_context',
value: 'test',
},
{ allowCache: true }
)
.toPromise();
expect(result).not.toEqual(cachedVersion);
});

it('arguments changed', async () => {
const result = await executor
.run('theme size default=14', null, { allowCache: true })
.toPromise();
expect(result).not.toEqual(cachedVersion);
});

it('search context changed', async () => {
const result = await executor
.run('theme size default=12', null, {
searchContext: { filters: [] },
allowCache: true,
})
.toPromise();
expect(result).not.toEqual(cachedVersion);
});
});
});
});
23 changes: 20 additions & 3 deletions src/plugins/expressions/common/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import {
import { ExecutorState, ExecutorContainer } from './container';
import { createExecutorContainer } from './container';
import { AnyExpressionFunctionDefinition, ExpressionFunction } from '../expression_functions';
import { Execution, ExecutionParams, ExecutionResult } from '../execution/execution';
import {
Execution,
ExecutionParams,
ExecutionResult,
FunctionCacheItem,
} from '../execution/execution';
import { IRegistry } from '../types';
import { ExpressionType } from '../expression_types/expression_type';
import { AnyExpressionTypeDefinition } from '../expression_types/types';
Expand Down Expand Up @@ -109,10 +114,17 @@ export class Executor<Context extends Record<string, unknown> = Record<string, u
*/
public readonly types: TypesRegistry;

constructor(private readonly logger?: Logger, state?: ExecutorState<Context>) {
private functionCache: Map<string, FunctionCacheItem>;

constructor(
private readonly logger?: Logger,
state?: ExecutorState<Context>,
functionCache: Map<string, FunctionCacheItem> = new Map()
) {
this.functions = new FunctionsRegistry(this as Executor);
this.types = new TypesRegistry(this as Executor);
this.container = createExecutorContainer<Context>(state);
this.functionCache = functionCache;
}

public get state(): ExecutorState<Context> {
Expand Down Expand Up @@ -189,12 +201,17 @@ export class Executor<Context extends Record<string, unknown> = Record<string, u
const executionParams = {
params,
executor: this,
functionCache: this.functionCache,
} as ExecutionParams;

if (typeof ast === 'string') executionParams.expression = ast;
else executionParams.ast = ast;

const execution = new Execution<Input, Output>(executionParams, this.logger);
const execution = new Execution<Input, Output>(
executionParams,
this.logger,
this.functionCache
);

return execution;
}
Expand Down
Loading

0 comments on commit 79b8bab

Please sign in to comment.