Skip to content

Commit

Permalink
wip: lots of metrics changes and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
zackpollard committed Aug 7, 2024
1 parent 6562a74 commit a28287b
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 617 deletions.
501 changes: 19 additions & 482 deletions tiles/package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tiles/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
"wrangler": "^3.60.3"
},
"dependencies": {
"@influxdata/influxdb3-client": "^0.9.0"
"@influxdata/influxdb-client": "^1.34.0"
}
}
152 changes: 79 additions & 73 deletions tiles/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { monitorAsyncFunction } from './monitor';
import { Metrics } from './monitor';
import { PMTiles } from './pmtiles/pmtiles';
import { Header } from './pmtiles/types';
import { tileJSON } from './pmtiles/utils';
Expand Down Expand Up @@ -49,85 +49,91 @@ export function parseUrl(request: Request): PMTilesParams {
return { requestType: undefined, url };
}

// export let context: ExecutionContext | undefined;
// export let defaultTags: { [key: string]: string };
async function handleRequest(request: Request<unknown, IncomingRequestCfProperties>, env: Env, ctx: ExecutionContext) {
const metrics = Metrics.getMetrics();
const cacheResponse = async (response: Response): Promise<Response> => {
if (!response.body) throw new Error('Response body is undefined');
const responseBody = await response.arrayBuffer();
ctx.waitUntil(cache.put(request.url, new Response(responseBody, response)));
return new Response(responseBody, response);
};

const handleTileRequest = async (z: string, x: string, y: string, pmTiles: PMTiles, respHeaders: Headers) => {
const tile = await pmTiles.getTile(+z, +x, +y);
if (!tile) return new Response('Tile not found', { status: 404 });
respHeaders.set('Content-Type', 'application/x-protobuf');
respHeaders.set('content-encoding', 'gzip');
return cacheResponse(new Response(tile.data, { headers: respHeaders, status: 200, encodeBody: 'manual' }));
};

async function handleJsonRequest() {
const { version, url } = pmTilesParams as PMTilesJsonParams;
const header = pmTiles.getHeader();
const metadata = await pmTiles.getMetadata();
respHeaders.set('Content-Type', 'application/json');
const tileJson = tileJSON({ header, metadata, hostname: url.hostname, version });
return cacheResponse(new Response(JSON.stringify(tileJson), { headers: respHeaders, status: 200 }));
}

export default {
async fetch(request, env, ctx): Promise<Response> {
// defaultTags = {
// colo: request.cf?.colo ?? '',
// rayId: request.headers.get('cf-ray') ?? '',
// asOrg: request.cf?.asOrganization ?? '',
// };
// context = ctx;
if (!globalThis.source) {
globalThis.source = new R2Source(env);
}
if (request.method.toUpperCase() !== 'GET') {
return new Response(undefined, { status: 405 });
}

const cacheResponse = async (response: Response): Promise<Response> => {
if (!response.body) throw new Error('Response body is undefined');
const responseBody = await response.arrayBuffer();
ctx.waitUntil(cache.put(request.url, new Response(responseBody, response)));
return new Response(responseBody, response);
};
const cache = caches.default;
const cached = await metrics.monitorAsyncFunction({ name: 'match_request_from_cdn' }, (url) => cache.match(url))(
request.url,
);
if (cached) {
const cacheHeaders = new Headers(cached.headers);
const encodeBody = cacheHeaders.has('content-encoding') ? 'manual' : 'automatic';
return new Response(cached.body, {
headers: cacheHeaders,
status: cached.status,
encodeBody,
});
}

if (request.method.toUpperCase() !== 'GET') {
return new Response(undefined, { status: 405 });
const pmTiles = await metrics.monitorAsyncFunction({ name: 'pmtiles_init' }, PMTiles.init)(
source,
globalThis.headerCache,
env.KV,
ctx,
);

const respHeaders = new Headers();
respHeaders.set('Cache-Control', `public, max-age=${60 * 60 * 24 * 31}`);
respHeaders.set('Access-Control-Allow-Origin', '*');
respHeaders.set('Vary', 'Origin');
respHeaders.set('PMTiles-File-Identifier', env.PMTILES_FILE_HASH);

const pmTilesParams = parseUrl(request);

try {
if (pmTilesParams.requestType === 'tile') {
const { z, x, y, version } = pmTilesParams as PMTilesTileParams;
return await metrics.monitorAsyncFunction(
{ name: 'tile_request', extraTags: { z, x, y, version } },
handleTileRequest,
)(z, x, y, pmTiles, respHeaders);
} else if (pmTilesParams.requestType === 'json') {
const { version } = pmTilesParams as PMTilesJsonParams;
return await metrics.monitorAsyncFunction({ name: 'json_request', extraTags: { version } }, handleJsonRequest)();
}
} catch (e) {
console.error(e);
return new Response('Internal Server Error', { status: 500 });
}

const cache = caches.default;
const cached = await monitorAsyncFunction('match-request-from-cdn', (url) => cache.match(url))(request.url);
if (cached) {
const cacheHeaders = new Headers(cached.headers);
for (const [key, value] of cacheHeaders.entries()) {
console.log(`${key}: ${value}`);
}
const encodeBody = cacheHeaders.has('content-encoding') ? 'manual' : 'automatic';
console.log('encodeBody', encodeBody);
return new Response(cached.body, {
headers: cacheHeaders,
status: cached.status,
encodeBody,
});
}
return cacheResponse(new Response('Invalid URL', { status: 404 }));
}

const pmTiles = await monitorAsyncFunction('pmtiles-init', PMTiles.init)(
source,
globalThis.headerCache,
env.KV,
ctx,
);

const respHeaders = new Headers();
respHeaders.set('Cache-Control', `public, max-age=${60 * 60 * 24 * 31}`);
respHeaders.set('Access-Control-Allow-Origin', '*');
respHeaders.set('Vary', 'Origin');
respHeaders.set('PMTiles-File-Identifier', env.PMTILES_FILE_HASH);

const pmTilesParams = parseUrl(request);

try {
if (pmTilesParams.requestType === 'tile') {
const { z, x, y } = pmTilesParams as PMTilesTileParams;
console.log(z, x, y);
const tile = await pmTiles.getTile(+z, +x, +y);
if (!tile) return new Response('Tile not found', { status: 404 });
respHeaders.set('Content-Type', 'application/x-protobuf');
respHeaders.set('content-encoding', 'gzip');
return cacheResponse(new Response(tile.data, { headers: respHeaders, status: 200, encodeBody: 'manual' }));
} else if (pmTilesParams.requestType === 'json') {
const { version, url } = pmTilesParams as PMTilesJsonParams;
const header = pmTiles.getHeader();
const metadata = await pmTiles.getMetadata();
respHeaders.set('Content-Type', 'application/json');
const tileJson = tileJSON({ header, metadata, hostname: url.hostname, version });
return cacheResponse(new Response(JSON.stringify(tileJson), { headers: respHeaders, status: 200 }));
}
} catch (e) {
console.error(e);
return new Response('Internal Server Error', { status: 500 });
export default {
async fetch(request, env, ctx): Promise<Response> {
const metrics = Metrics.initialiseMetrics('tiles', request, ctx, env);
if (!globalThis.source) {
globalThis.source = new R2Source(env);
}

return cacheResponse(new Response('Invalid URL', { status: 404 }));
return metrics.monitorAsyncFunction({ name: 'handle_request' }, handleRequest)(request, env, ctx);
},
} satisfies ExportedHandler<Env>;
128 changes: 79 additions & 49 deletions tiles/src/monitor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any

import { Point } from '@influxdata/influxdb3-client';
import { Point } from '@influxdata/influxdb-client';

type AsyncFn = (...args: any[]) => Promise<any>;

Expand All @@ -10,7 +10,6 @@ type Class = { new (...args: any[]): any };
type Options = Partial<{
monitorInvocations?: boolean;
acceptedErrors?: Class[];
thisArg?: any;
}>;

const startTimer = () => {
Expand All @@ -22,56 +21,87 @@ const startTimer = () => {
};
};

export function monitorAsyncFunction<T extends AsyncFn>(
operationName: string,
call: T,
options: Options = { monitorInvocations: true, acceptedErrors: [], thisArg: undefined },
): (...args: Parameters<T>) => Promise<Awaited<ReturnType<T>>> {
const { monitorInvocations, acceptedErrors } = options;
const point = Point.measurement(operationName);
// for (const [key, value] of Object.entries(defaultTags)) {
// point.setTag(key, value);
// }
return async (...args: Parameters<T>) => {
if (monitorInvocations) {
console.log(`${operationName}-invocation`);
point.setIntegerField('invocation', 1);
export class Metrics {
private static _instance: Metrics;
private readonly ctx: ExecutionContext;
private readonly request: Request;
private readonly env: Env;
private readonly defaultTags: { [key: string]: string };
private readonly operationPrefix: string;

private constructor(
operationPrefix: string,
request: Request<unknown, IncomingRequestCfProperties>,
ctx: ExecutionContext,
env: Env,
) {
this.request = request;
this.ctx = ctx;
this.env = env;
this.defaultTags = {
colo: request.cf?.colo ?? '',
rayId: request.headers.get('cf-ray') ?? '',
asOrg: request.cf?.asOrganization ?? '',
scriptTag: env.CF_VERSION_METADATA.tag,
scriptId: env.CF_VERSION_METADATA.id,
};
this.operationPrefix = operationPrefix;
}

public static initialiseMetrics(
operationPrefix: string,
request: Request<unknown, IncomingRequestCfProperties>,
ctx: ExecutionContext,
env: Env,
) {
this._instance = new Metrics(operationPrefix, request, ctx, env);
return this._instance;
}

static getMetrics() {
return this._instance;
}

monitorAsyncFunction<T extends AsyncFn>(
operation: { name: string; extraTags?: { [key: string]: string } },
call: T,
options: Options = {},
): (...args: Parameters<T>) => Promise<Awaited<ReturnType<T>>> {
const { name: operationName, extraTags = {} } = operation;
const { monitorInvocations = true, acceptedErrors = [] } = options;

const point = new Point(`${this.operationPrefix}_${operationName}`);
for (const [key, value] of Object.entries({ ...this.defaultTags, ...extraTags })) {
point.tag(key, value);
}
const timer = startTimer();
try {
return options.thisArg ? await call.bind(options.thisArg)(...args) : await call(...args);
} catch (e) {
if (!acceptedErrors || !acceptedErrors.some((acceptedError) => e instanceof acceptedError)) {
console.log(`${operationName}-errors`);
point.setIntegerField('errors', 1);
return async (...args: Parameters<T>) => {
if (monitorInvocations) {
console.log(`${operationName}_invocation`);
point.intField('invocation', 1);
}
throw e;
} finally {
console.log(`${operationName}-duration`, timer.elapsedMs());
point.setIntegerField('duration', timer.elapsedMs());
console.log(point.toLineProtocol()?.toString());
await fetch('https://cf-workers.monitoring.immich.cloud/write', {
method: 'POST',
body: point.toLineProtocol()?.toString(),
headers: {
'User-Agent': 'InfluxDBClient/1.0',
Authorization: `Token YvGUMEDeN5UFk3iJjvGsGMcZNQmNQzlVszYmJkJ2`,
Accept: '*/*',
'Content-Type': 'application/x-www-form-urlencoded',
},
redirect: 'follow',
})
.then((response) => {
console.log('url', response.url);
console.log('status', response.status);
console.log('location header', response.headers.get('Location'));
for (const [key, value] of response.headers.entries()) {
console.log('fetch headers', `${key}: ${value}`);
const timer = startTimer();
return call(...args)
.catch((e) => {
if (!acceptedErrors || !acceptedErrors.some((acceptedError) => e instanceof acceptedError)) {
console.log(`${operationName}_errors`);
point.intField('errors', 1);
}
throw e;
})
.catch((error) => {
console.error('error', error);
.finally(() => {
console.log(`${operationName}_duration`, timer.elapsedMs());
point.intField('duration', timer.elapsedMs());
console.log(point.toLineProtocol()?.toString());
this.ctx.waitUntil(
fetch('https://cf-workers.monitoring.immich.cloud/write', {
method: 'POST',
body: point.toLineProtocol()?.toString(),
headers: {
Authorization: `Token `,
},
}),
);
});
}
};
};
}
}
13 changes: 6 additions & 7 deletions tiles/src/pmtiles/pmtiles.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { monitorAsyncFunction } from '../monitor';
import { Metrics } from '../monitor';
import { R2Source, RangeResponse } from '../r2';
import { Directory, Header, Metadata } from './types';
import { bytesToHeader, decompress, deserializeIndex, findTile, zxyToTileId } from './utils';
Expand Down Expand Up @@ -116,11 +116,10 @@ export class PMTiles {
offset = entry.offset;
length = entry.length;
if (entry.runLength !== 0) break; // Run length of 0 is a directory, anything else is a tile
const leafDirectory = await monitorAsyncFunction('get-leaf-directory', this.getDirectory, { thisArg: this })(
header.leafDirectoryOffset + offset,
length,
header,
);
const leafDirectory = await Metrics.getMetrics().monitorAsyncFunction(
{ name: 'get_leaf_directory' },
(offset, length, header) => this.getDirectory(offset, length, header),
)(header.leafDirectoryOffset + offset, length, header);
entry = findTile(leafDirectory.entries, tileId);
}
const tile = await this.source.getBytesFromArchive({
Expand All @@ -131,7 +130,7 @@ export class PMTiles {
}

async getMetadata(): Promise<Metadata> {
const header = await this.getHeader();
const header = this.getHeader();

const resp = await this.source.getBytesFromArchive({
offset: header.jsonMetadataOffset,
Expand Down
11 changes: 6 additions & 5 deletions tiles/worker-configuration.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Generated by Wrangler on Tue Aug 06 2024 12:16:44 GMT+0100 (British Summer Time)
// Generated by Wrangler on Wed Aug 07 2024 15:42:16 GMT+0100 (British Summer Time)
// by running `wrangler types`

interface Env {
KV: KVNamespace;
PMTILES_FILE_NAME: 'v1.pmtiles';
PMTILES_FILE_HASH: 'example-dev-hash';
BUCKET: R2Bucket;
KV: KVNamespace;
PMTILES_FILE_NAME: "v1.pmtiles";
PMTILES_FILE_HASH: "example-dev-hash";
BUCKET: R2Bucket;
CF_VERSION_METADATA: { id: string; tag: string };
}
3 changes: 3 additions & 0 deletions tiles/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ binding = "BUCKET"
bucket_name = "tiles"
preview_bucket_name = "tiles"

[version_metadata]
binding = "CF_VERSION_METADATA"

# Define environment variables for your Workers script
[vars]
PMTILES_FILE_NAME = "v1.pmtiles"
Expand Down

0 comments on commit a28287b

Please sign in to comment.