Skip to content

Commit

Permalink
fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
ppisljar committed Apr 15, 2024
1 parent 91ba741 commit 5841060
Showing 1 changed file with 16 additions and 33 deletions.
49 changes: 16 additions & 33 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
Subscription,
} from 'rxjs';
import { catchError, finalize, map, pluck, shareReplay, switchMap, tap } from 'rxjs';
import {now, AbortError, calculateObjectHash} from '@kbn/kibana-utils-plugin/common';
import { now, AbortError, calculateObjectHash } from '@kbn/kibana-utils-plugin/common';
import { Adapters } from '@kbn/inspector-plugin/common';
import { Executor } from '../executor';
import { createExecutionContainer, ExecutionContainer } from './container';
Expand Down Expand Up @@ -185,6 +185,8 @@ export interface ExecutionParams {
params: ExpressionExecutionParams;
}

const functionCache: Map<string, any> = new Map();

export class Execution<
Input = unknown,
Output = unknown,
Expand Down Expand Up @@ -237,8 +239,6 @@ export class Execution<
* @private
*/
private readonly childExecutions: Execution[] = [];

private functionCache: Map<string, any> = new Map();
private cacheTimeout: number = 30000;

/**
Expand Down Expand Up @@ -455,51 +455,34 @@ export class Execution<
}).pipe(catchError((error) => of(error)));
}

async getCachedResults(
fn: ExpressionFunction,
normalizedInput: unknown,
args: Record<string, unknown>
) {
let fnOutput;
const hash = calculateObjectHash([fn.name, normalizedInput, args, this.context.getSearchContext()]);
if (this.context.allowCache && fn.allowCache && this.functionCache.has(hash)) {
fnOutput = this.functionCache.get(hash);
} else {
fnOutput = await of(fn.fn(normalizedInput, args, this.context));
if (fn.allowCache) {
while (this.functionCache.size >= maxCacheSize) {
this.functionCache.delete(this.functionCache.keys().next().value);
}
this.functionCache.set(hash, fnOutput);
}
}
return fnOutput;
}

invokeFunction<Fn extends ExpressionFunction>(
fn: Fn,
input: unknown,
args: Record<string, unknown>
): Observable<UnwrapReturnType<Fn['fn']>> {

let hash: string | undefined;

return of(input).pipe(
map((currentInput) => this.cast(currentInput, fn.inputTypes)),
map((normalizedInput) => {
if (fn.allowCache && this.context.allowCache) {
hash = calculateObjectHash([fn.name, normalizedInput, args, this.context.getSearchContext()]);
hash = calculateObjectHash([
fn.name,
normalizedInput,
args,
this.context.getSearchContext(),
]);
}
return normalizedInput;
}),
switchMap((normalizedInput) => {
if (hash && this.functionCache.has(hash)) {
const cached = this.functionCache.get(hash)
if (hash && functionCache.has(hash)) {
const cached = functionCache.get(hash);
if (Date.now() - cached.time < this.cacheTimeout) {
return cached.value;
return of(cached.value);
}
}
return of(fn.fn(normalizedInput, args, this.context))
return of(fn.fn(normalizedInput, args, this.context));
}),
switchMap(
(fnResult) =>
Expand Down Expand Up @@ -532,10 +515,10 @@ export class Execution<
}

if (hash) {
while (this.functionCache.size >= maxCacheSize) {
this.functionCache.delete(this.functionCache.keys().next().value);
while (functionCache.size >= maxCacheSize) {
functionCache.delete(functionCache.keys().next().value);
}
this.functionCache.set(hash, { value: output, time: Date.now() });
functionCache.set(hash, { value: output, time: Date.now() });
}
return output;
})
Expand Down

0 comments on commit 5841060

Please sign in to comment.