Skip to content

Commit

Permalink
[Uptime] Use scripted metric for snapshot calculation (#58247) (#58389)
Browse files Browse the repository at this point in the history
Fixes #58079

This is an improved version of #58078

Note, this is a bugfix targeting 7.6.1 . I've decided to open this PR directly against 7.6 in the interest of time. We can forward-port this to 7.x / master later.

This patch improves the handling of timespans with snapshot counts. This feature originally worked, but suffered a regression when we increased the default timespan in the query context to 5m. This means that without this patch the counts you get are the maximum total number of monitors that were down over the past 5m, which is not really that useful.

We now use a scripted metric to always count precisely the number of up/down monitors. On my box this could process 400k summary docs in ~600ms. This should scale as shards are added.

I attempted to keep memory usage relatively slow by using simple maps of strings.
  • Loading branch information
andrewvc committed Feb 24, 2020
1 parent 2065ac5 commit 80ad29a
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 107 deletions.
194 changes: 138 additions & 56 deletions x-pack/legacy/plugins/uptime/server/lib/requests/get_snapshot_counts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { UMElasticsearchQueryFn } from '../adapters';
import { Snapshot } from '../../../common/runtime_types';
import { QueryContext, MonitorGroupIterator } from './search';
import { QueryContext } from './search';
import { CONTEXT_DEFAULTS, INDEX_NAMES } from '../../../common/constants';

export interface GetSnapshotCountParams {
Expand All @@ -16,49 +16,6 @@ export interface GetSnapshotCountParams {
statusFilter?: string;
}

const fastStatusCount = async (context: QueryContext): Promise<Snapshot> => {
const params = {
index: INDEX_NAMES.HEARTBEAT,
body: {
size: 0,
query: { bool: { filter: await context.dateAndCustomFilters() } },
aggs: {
unique: {
// We set the precision threshold to 40k which is the max precision supported by cardinality
cardinality: { field: 'monitor.id', precision_threshold: 40000 },
},
down: {
filter: { range: { 'summary.down': { gt: 0 } } },
aggs: {
unique: { cardinality: { field: 'monitor.id', precision_threshold: 40000 } },
},
},
},
},
};

const statistics = await context.search(params);
const total = statistics.aggregations.unique.value;
const down = statistics.aggregations.down.unique.value;

return {
total,
down,
up: total - down,
};
};

const slowStatusCount = async (context: QueryContext, status: string): Promise<number> => {
const downContext = context.clone();
downContext.statusFilter = status;
const iterator = new MonitorGroupIterator(downContext);
let count = 0;
while (await iterator.next()) {
count++;
}
return count;
};

export const getSnapshotCount: UMElasticsearchQueryFn<GetSnapshotCountParams, Snapshot> = async ({
callES,
dateRangeStart,
Expand All @@ -81,22 +38,147 @@ export const getSnapshotCount: UMElasticsearchQueryFn<GetSnapshotCountParams, Sn
);

// Calculate the total, up, and down counts.
const counts = await fastStatusCount(context);

// Check if the last count was accurate, if not, we need to perform a slower count with the
// MonitorGroupsIterator.
if (!(await context.hasTimespan())) {
// Figure out whether 'up' or 'down' is more common. It's faster to count the lower cardinality
// one then use subtraction to figure out its opposite.
const [leastCommonStatus, mostCommonStatus]: Array<'up' | 'down'> =
counts.up > counts.down ? ['down', 'up'] : ['up', 'down'];
counts[leastCommonStatus] = await slowStatusCount(context, leastCommonStatus);
counts[mostCommonStatus] = counts.total - counts[leastCommonStatus];
}
const counts = await statusCount(context);

return {
total: statusFilter ? counts[statusFilter] : counts.total,
up: statusFilter === 'down' ? 0 : counts.up,
down: statusFilter === 'up' ? 0 : counts.down,
};
};

const statusCount = async (context: QueryContext): Promise<Snapshot> => {
const res = await context.search({
index: INDEX_NAMES.HEARTBEAT,
body: statusCountBody(await context.dateAndCustomFilters()),
});

return res.aggregations.counts.value;
};

const statusCountBody = (filters: any): any => {
return {
size: 0,
query: {
bool: {
filter: [
{
exists: {
field: 'summary',
},
},
filters,
],
},
},
aggs: {
counts: {
scripted_metric: {
init_script: 'state.locStatus = new HashMap(); state.totalDocs = 0;',
map_script: `
def loc = doc["observer.geo.name"].size() == 0 ? "" : doc["observer.geo.name"][0];
// One concern here is memory since we could build pretty gigantic maps. I've opted to
// stick to a simple <String,String> map to reduce memory overhead. This means we do
// a little string parsing to treat these strings as records that stay lexicographically
// sortable (which is important later).
// We encode the ID and location as $id.len:$id$loc
String id = doc["monitor.id"][0];
String idLenDelim = Integer.toHexString(id.length()) + ":" + id;
String idLoc = loc == null ? idLenDelim : idLenDelim + loc;
String status = doc["summary.down"][0] > 0 ? "d" : "u";
String timeAndStatus = doc["@timestamp"][0].toInstant().toEpochMilli().toString() + status;
state.locStatus[idLoc] = timeAndStatus;
state.totalDocs++;
`,
combine_script: `
return state;
`,
reduce_script: `
// Use a treemap since it's traversable in sorted order.
// This is important later.
TreeMap locStatus = new TreeMap();
long totalDocs = 0;
int uniqueIds = 0;
for (state in states) {
totalDocs += state.totalDocs;
for (entry in state.locStatus.entrySet()) {
// Update the value for the given key if we have a more recent check from this location.
locStatus.merge(entry.getKey(), entry.getValue(), (a,b) -> a.compareTo(b) > 0 ? a : b)
}
}
HashMap locTotals = new HashMap();
int total = 0;
int down = 0;
String curId = "";
boolean curIdDown = false;
// We now iterate through our tree map in order, which means records for a given ID
// always are encountered one after the other. This saves us having to make an intermediate
// map.
for (entry in locStatus.entrySet()) {
String idLoc = entry.getKey();
String timeStatus = entry.getValue();
// Parse the length delimited id/location strings described in the map section
int colonIndex = idLoc.indexOf(":");
int idEnd = Integer.parseInt(idLoc.substring(0, colonIndex), 16) + colonIndex + 1;
String id = idLoc.substring(colonIndex + 1, idEnd);
String loc = idLoc.substring(idEnd, idLoc.length());
String status = timeStatus.substring(timeStatus.length() - 1);
// Here we increment counters for the up/down key per location
// We also create a new hashmap in locTotals if we've never seen this location
// before.
locTotals.compute(loc, (k,v) -> {
HashMap res = v;
if (v == null) {
res = new HashMap();
res.put('up', 0);
res.put('down', 0);
}
if (status == 'u') {
res.up++;
} else {
res.down++;
}
return res;
});
// We've encountered a new ID
if (curId != id) {
total++;
curId = id;
if (status == "d") {
curIdDown = true;
down++;
} else {
curIdDown = false;
}
} else if (!curIdDown) {
if (status == "d") {
curIdDown = true;
down++;
} else {
curIdDown = false;
}
}
}
Map result = new HashMap();
result.total = total;
result.location_totals = locTotals;
result.up = total - down;
result.down = down;
result.totalDocs = totalDocs;
return result;
`,
},
},
},
};
};
105 changes: 54 additions & 51 deletions x-pack/test/api_integration/apis/uptime/rest/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,66 +34,69 @@ export default function({ getService }: FtrProviderContext) {
let dateRange: { start: string; end: string };

[true, false].forEach(async (includeTimespan: boolean) => {
describe(`with timespans ${includeTimespan ? 'included' : 'missing'}`, async () => {
before(async () => {
const promises: Array<Promise<any>> = [];

// When includeTimespan is false we have to remove the values there.
let mogrify = (d: any) => d;
if ((includeTimespan = false)) {
mogrify = (d: any): any => {
d.monitor.delete('timespan');
[true, false].forEach(async (includeObserver: boolean) => {
describe(`with timespans=${includeTimespan} and observer=${includeObserver}`, async () => {
before(async () => {
const promises: Array<Promise<any>> = [];

const mogrify = (d: any) => {
if (!includeTimespan) {
delete d.monitor.timespan;
}
if (!includeObserver) {
delete d.observer;
}
return d;
};
}

const makeMonitorChecks = async (monitorId: string, status: 'up' | 'down') => {
return makeChecksWithStatus(
getService('legacyEs'),
monitorId,
checksPerMonitor,
numIps,
scheduleEvery,
{},
status,
mogrify
);
};

for (let i = 0; i < numUpMonitors; i++) {
promises.push(makeMonitorChecks(`up-${i}`, 'up'));
}
for (let i = 0; i < numDownMonitors; i++) {
promises.push(makeMonitorChecks(`down-${i}`, 'down'));
}
const makeMonitorChecks = async (monitorId: string, status: 'up' | 'down') => {
return makeChecksWithStatus(
getService('legacyEs'),
monitorId,
checksPerMonitor,
numIps,
scheduleEvery,
{},
status,
mogrify
);
};

const allResults = await Promise.all(promises);
dateRange = getChecksDateRange(allResults);
});
for (let i = 0; i < numUpMonitors; i++) {
promises.push(makeMonitorChecks(`up-${i}`, 'up'));
}
for (let i = 0; i < numDownMonitors; i++) {
promises.push(makeMonitorChecks(`down-${i}`, 'down'));
}

it('will count all statuses correctly', async () => {
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}`
);
const allResults = await Promise.all(promises);
dateRange = getChecksDateRange(allResults);
});

expectFixtureEql(apiResponse.body, 'snapshot');
});
it('will count all statuses correctly', async () => {
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}`
);

it('will fetch a monitor snapshot filtered by down status', async () => {
const statusFilter = 'down';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);
expectFixtureEql(apiResponse.body, 'snapshot');
});

expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_down');
});
it('will fetch a monitor snapshot filtered by down status', async () => {
const statusFilter = 'down';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);

it('will fetch a monitor snapshot filtered by up status', async () => {
const statusFilter = 'up';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);
expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_up');
expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_down');
});

it('will fetch a monitor snapshot filtered by up status', async () => {
const statusFilter = 'up';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);
expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_up');
});
});
});
});
Expand Down

0 comments on commit 80ad29a

Please sign in to comment.