From 80ad29ace15ac92b596f5f07cff6a31c227517d6 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 24 Feb 2020 16:46:58 -0600 Subject: [PATCH] [Uptime] Use scripted metric for snapshot calculation (#58247) (#58389) Fixes #58079 This is an improved version of #58078 Note, this is a bugfix targeting 7.6.1 . I've decided to open this PR directly against 7.6 in the interest of time. We can forward-port this to 7.x / master later. This patch improves the handling of timespans with snapshot counts. This feature originally worked, but suffered a regression when we increased the default timespan in the query context to 5m. This means that without this patch the counts you get are the maximum total number of monitors that were down over the past 5m, which is not really that useful. We now use a scripted metric to always count precisely the number of up/down monitors. On my box this could process 400k summary docs in ~600ms. This should scale as shards are added. I attempted to keep memory usage relatively slow by using simple maps of strings. --- .../lib/requests/get_snapshot_counts.ts | 194 +++++++++++++----- .../apis/uptime/rest/snapshot.ts | 105 +++++----- 2 files changed, 192 insertions(+), 107 deletions(-) diff --git a/x-pack/legacy/plugins/uptime/server/lib/requests/get_snapshot_counts.ts b/x-pack/legacy/plugins/uptime/server/lib/requests/get_snapshot_counts.ts index 62369711460159..8d84c0f4d6769f 100644 --- a/x-pack/legacy/plugins/uptime/server/lib/requests/get_snapshot_counts.ts +++ b/x-pack/legacy/plugins/uptime/server/lib/requests/get_snapshot_counts.ts @@ -6,7 +6,7 @@ import { UMElasticsearchQueryFn } from '../adapters'; import { Snapshot } from '../../../common/runtime_types'; -import { QueryContext, MonitorGroupIterator } from './search'; +import { QueryContext } from './search'; import { CONTEXT_DEFAULTS, INDEX_NAMES } from '../../../common/constants'; export interface GetSnapshotCountParams { @@ -16,49 +16,6 @@ export interface GetSnapshotCountParams { statusFilter?: string; } -const fastStatusCount = async (context: QueryContext): Promise => { - const params = { - index: INDEX_NAMES.HEARTBEAT, - body: { - size: 0, - query: { bool: { filter: await context.dateAndCustomFilters() } }, - aggs: { - unique: { - // We set the precision threshold to 40k which is the max precision supported by cardinality - cardinality: { field: 'monitor.id', precision_threshold: 40000 }, - }, - down: { - filter: { range: { 'summary.down': { gt: 0 } } }, - aggs: { - unique: { cardinality: { field: 'monitor.id', precision_threshold: 40000 } }, - }, - }, - }, - }, - }; - - const statistics = await context.search(params); - const total = statistics.aggregations.unique.value; - const down = statistics.aggregations.down.unique.value; - - return { - total, - down, - up: total - down, - }; -}; - -const slowStatusCount = async (context: QueryContext, status: string): Promise => { - const downContext = context.clone(); - downContext.statusFilter = status; - const iterator = new MonitorGroupIterator(downContext); - let count = 0; - while (await iterator.next()) { - count++; - } - return count; -}; - export const getSnapshotCount: UMElasticsearchQueryFn = async ({ callES, dateRangeStart, @@ -81,18 +38,7 @@ export const getSnapshotCount: UMElasticsearchQueryFn = - counts.up > counts.down ? ['down', 'up'] : ['up', 'down']; - counts[leastCommonStatus] = await slowStatusCount(context, leastCommonStatus); - counts[mostCommonStatus] = counts.total - counts[leastCommonStatus]; - } + const counts = await statusCount(context); return { total: statusFilter ? counts[statusFilter] : counts.total, @@ -100,3 +46,139 @@ export const getSnapshotCount: UMElasticsearchQueryFn => { + const res = await context.search({ + index: INDEX_NAMES.HEARTBEAT, + body: statusCountBody(await context.dateAndCustomFilters()), + }); + + return res.aggregations.counts.value; +}; + +const statusCountBody = (filters: any): any => { + return { + size: 0, + query: { + bool: { + filter: [ + { + exists: { + field: 'summary', + }, + }, + filters, + ], + }, + }, + aggs: { + counts: { + scripted_metric: { + init_script: 'state.locStatus = new HashMap(); state.totalDocs = 0;', + map_script: ` + def loc = doc["observer.geo.name"].size() == 0 ? "" : doc["observer.geo.name"][0]; + + // One concern here is memory since we could build pretty gigantic maps. I've opted to + // stick to a simple map to reduce memory overhead. This means we do + // a little string parsing to treat these strings as records that stay lexicographically + // sortable (which is important later). + // We encode the ID and location as $id.len:$id$loc + String id = doc["monitor.id"][0]; + String idLenDelim = Integer.toHexString(id.length()) + ":" + id; + String idLoc = loc == null ? idLenDelim : idLenDelim + loc; + + String status = doc["summary.down"][0] > 0 ? "d" : "u"; + String timeAndStatus = doc["@timestamp"][0].toInstant().toEpochMilli().toString() + status; + state.locStatus[idLoc] = timeAndStatus; + state.totalDocs++; + `, + combine_script: ` + return state; + `, + reduce_script: ` + // Use a treemap since it's traversable in sorted order. + // This is important later. + TreeMap locStatus = new TreeMap(); + long totalDocs = 0; + int uniqueIds = 0; + for (state in states) { + totalDocs += state.totalDocs; + for (entry in state.locStatus.entrySet()) { + // Update the value for the given key if we have a more recent check from this location. + locStatus.merge(entry.getKey(), entry.getValue(), (a,b) -> a.compareTo(b) > 0 ? a : b) + } + } + + HashMap locTotals = new HashMap(); + int total = 0; + int down = 0; + String curId = ""; + boolean curIdDown = false; + // We now iterate through our tree map in order, which means records for a given ID + // always are encountered one after the other. This saves us having to make an intermediate + // map. + for (entry in locStatus.entrySet()) { + String idLoc = entry.getKey(); + String timeStatus = entry.getValue(); + + // Parse the length delimited id/location strings described in the map section + int colonIndex = idLoc.indexOf(":"); + int idEnd = Integer.parseInt(idLoc.substring(0, colonIndex), 16) + colonIndex + 1; + String id = idLoc.substring(colonIndex + 1, idEnd); + String loc = idLoc.substring(idEnd, idLoc.length()); + String status = timeStatus.substring(timeStatus.length() - 1); + + // Here we increment counters for the up/down key per location + // We also create a new hashmap in locTotals if we've never seen this location + // before. + locTotals.compute(loc, (k,v) -> { + HashMap res = v; + if (v == null) { + res = new HashMap(); + res.put('up', 0); + res.put('down', 0); + } + + if (status == 'u') { + res.up++; + } else { + res.down++; + } + + return res; + }); + + + // We've encountered a new ID + if (curId != id) { + total++; + curId = id; + if (status == "d") { + curIdDown = true; + down++; + } else { + curIdDown = false; + } + } else if (!curIdDown) { + if (status == "d") { + curIdDown = true; + down++; + } else { + curIdDown = false; + } + } + } + + Map result = new HashMap(); + result.total = total; + result.location_totals = locTotals; + result.up = total - down; + result.down = down; + result.totalDocs = totalDocs; + return result; + `, + }, + }, + }, + }; +}; diff --git a/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts b/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts index b0d97837c770f9..20fe59d149ae8f 100644 --- a/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts +++ b/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts @@ -34,66 +34,69 @@ export default function({ getService }: FtrProviderContext) { let dateRange: { start: string; end: string }; [true, false].forEach(async (includeTimespan: boolean) => { - describe(`with timespans ${includeTimespan ? 'included' : 'missing'}`, async () => { - before(async () => { - const promises: Array> = []; - - // When includeTimespan is false we have to remove the values there. - let mogrify = (d: any) => d; - if ((includeTimespan = false)) { - mogrify = (d: any): any => { - d.monitor.delete('timespan'); + [true, false].forEach(async (includeObserver: boolean) => { + describe(`with timespans=${includeTimespan} and observer=${includeObserver}`, async () => { + before(async () => { + const promises: Array> = []; + + const mogrify = (d: any) => { + if (!includeTimespan) { + delete d.monitor.timespan; + } + if (!includeObserver) { + delete d.observer; + } return d; }; - } - - const makeMonitorChecks = async (monitorId: string, status: 'up' | 'down') => { - return makeChecksWithStatus( - getService('legacyEs'), - monitorId, - checksPerMonitor, - numIps, - scheduleEvery, - {}, - status, - mogrify - ); - }; - for (let i = 0; i < numUpMonitors; i++) { - promises.push(makeMonitorChecks(`up-${i}`, 'up')); - } - for (let i = 0; i < numDownMonitors; i++) { - promises.push(makeMonitorChecks(`down-${i}`, 'down')); - } + const makeMonitorChecks = async (monitorId: string, status: 'up' | 'down') => { + return makeChecksWithStatus( + getService('legacyEs'), + monitorId, + checksPerMonitor, + numIps, + scheduleEvery, + {}, + status, + mogrify + ); + }; - const allResults = await Promise.all(promises); - dateRange = getChecksDateRange(allResults); - }); + for (let i = 0; i < numUpMonitors; i++) { + promises.push(makeMonitorChecks(`up-${i}`, 'up')); + } + for (let i = 0; i < numDownMonitors; i++) { + promises.push(makeMonitorChecks(`down-${i}`, 'down')); + } - it('will count all statuses correctly', async () => { - const apiResponse = await supertest.get( - `/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}` - ); + const allResults = await Promise.all(promises); + dateRange = getChecksDateRange(allResults); + }); - expectFixtureEql(apiResponse.body, 'snapshot'); - }); + it('will count all statuses correctly', async () => { + const apiResponse = await supertest.get( + `/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}` + ); - it('will fetch a monitor snapshot filtered by down status', async () => { - const statusFilter = 'down'; - const apiResponse = await supertest.get( - `/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}` - ); + expectFixtureEql(apiResponse.body, 'snapshot'); + }); - expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_down'); - }); + it('will fetch a monitor snapshot filtered by down status', async () => { + const statusFilter = 'down'; + const apiResponse = await supertest.get( + `/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}` + ); - it('will fetch a monitor snapshot filtered by up status', async () => { - const statusFilter = 'up'; - const apiResponse = await supertest.get( - `/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}` - ); - expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_up'); + expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_down'); + }); + + it('will fetch a monitor snapshot filtered by up status', async () => { + const statusFilter = 'up'; + const apiResponse = await supertest.get( + `/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}` + ); + expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_up'); + }); }); }); });