From 14bc4bade6759f9f13002ecbf96bd99e8001cebd Mon Sep 17 00:00:00 2001 From: akosyakov Date: Mon, 24 Jul 2023 13:52:15 +0200 Subject: [PATCH] [local ssh] improve obsevability and try to prevent hanging - add started/existed status reporting to identify hanging process - don't swallow errors, but report all, we can sort them out later to investigate root causes of hanging - add detection of stale web sockets to prevent hanging --- src/common/telemetry.ts | 23 ++-- src/local-ssh/proxy.ts | 181 ++++++++++++++++++++---------- src/local-ssh/telemetryService.ts | 12 +- 3 files changed, 137 insertions(+), 79 deletions(-) diff --git a/src/common/telemetry.ts b/src/common/telemetry.ts index 6d078e43..7fbe6288 100644 --- a/src/common/telemetry.ts +++ b/src/common/telemetry.ts @@ -68,7 +68,7 @@ export function getErrorMetricsEndpoint(gitpodHost: string): string { return `https://ide.${serviceUrl.hostname}/metrics-api/reportError`; } -export function commonSendEventData(logService: ILogService, segmentClient: Analytics, machineId: string, eventName: string, data?: any) { +export async function commonSendEventData(logService: ILogService, segmentClient: Analytics, machineId: string, eventName: string, data?: any): Promise { const properties = data ?? {}; delete properties['gitpodHost']; @@ -77,16 +77,17 @@ export function commonSendEventData(logService: ILogService, segmentClient: Anal logService.trace('Local event report', eventName, properties); return; } - - segmentClient.track({ - anonymousId: machineId, - event: eventName, - properties - }, (err) => { - if (err) { - logService.error('Failed to log event to app analytics:', err); - } - }); + return new Promise((resolve) => + segmentClient.track({ + anonymousId: machineId, + event: eventName, + properties + }, (err) => { + if (err) { + logService.error('Failed to log event to app analytics:', err); + } + resolve(); + })) } interface SendErrorDataOptions { diff --git a/src/local-ssh/proxy.ts b/src/local-ssh/proxy.ts index 9c470d61..00b434b0 100644 --- a/src/local-ssh/proxy.ts +++ b/src/local-ssh/proxy.ts @@ -3,6 +3,71 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ +interface ClientOptions { + host: string; + extIpcPort: number; + machineID: string; +} + +function getClientOptions(): ClientOptions { + const args = process.argv.slice(2); + // %h is in the form of .vss.' + // add `https://` prefix since our gitpodHost is actually a url not host + const host = 'https://' + args[0].split('.').splice(2).join('.'); + return { + host, + extIpcPort: Number.parseInt(args[1], 10), + machineID: args[2] ?? '', + }; +} + +const options = getClientOptions(); +if (!options) { + process.exit(1); +} + +import { NopeLogger } from './logger'; +const logService = new NopeLogger(); + +// DO NOT PUSH CHANGES BELOW TO PRODUCTION +// import { DebugLogger } from './logger'; +// const logService = new DebugLogger(); + +import { TelemetryService } from './telemetryService'; +const telemetryService = new TelemetryService( + process.env.SEGMENT_KEY!, + options.machineID, + process.env.EXT_NAME!, + process.env.EXT_VERSION!, + options.host, + logService +); + +const flow: SSHUserFlowTelemetry = { + flow: 'local_ssh', + gitpodHost: options.host, + workspaceId: '', + processId: process.pid, +}; + +telemetryService.sendUserFlowStatus('started', flow) +const sendExited = (exitCode: number, forceExit: boolean, exitSignal?: NodeJS.Signals) => telemetryService.sendUserFlowStatus('exited', { + ...flow, + exitCode, + forceExit: String(forceExit), + signal: exitSignal +}) +// best effort to intercept process exit +const beforeExitListener = (exitCode: number) => { + process.removeListener('beforeExit', beforeExitListener); + return sendExited(exitCode, false) +} +process.addListener('beforeExit', beforeExitListener); +const exitProcess = async (forceExit: boolean, signal?: NodeJS.Signals) => { + await sendExited(0, forceExit, signal); + process.exit(0); +} + import { SshClient } from '@microsoft/dev-tunnels-ssh-tcp'; import { NodeStream, SshClientCredentials, SshClientSession, SshDisconnectReason, SshServerSession, SshSessionConfiguration, Stream, WebSocketStream } from '@microsoft/dev-tunnels-ssh'; import { importKey, importKeyBytes } from '@microsoft/dev-tunnels-ssh-keys'; @@ -13,7 +78,6 @@ import { WrapError } from '../common/utils'; import { WebSocket } from 'ws'; import * as stream from 'stream'; import { ILogService } from '../services/logService'; -import { TelemetryService } from './telemetryService'; import { ITelemetryService, UserFlowTelemetryProperties } from '../common/telemetry'; import { LocalSSHMetricsReporter } from '../services/localSSHMetrics'; @@ -25,24 +89,6 @@ function getHostKey(): Buffer { return Buffer.from(HOST_KEY, 'base64'); } -interface ClientOptions { - host: string; - extIpcPort: number; - machineID: string; -} - -function getClientOptions(): ClientOptions { - const args = process.argv.slice(2); - // %h is in the form of .vss.' - // add `https://` prefix since our gitpodHost is actually a url not host - const host = 'https://' + args[0].split('.').splice(2).join('.'); - return { - host, - extIpcPort: Number.parseInt(args[1], 10), - machineID: args[2] ?? '', - }; -} - type FailedToProxyCode = 'SSH.AuthenticationFailed' | 'TUNNEL.AuthenticateSSHKeyFailed' | 'NoRunningInstance' | 'FailedToGetAuthInfo' | 'GitpodHostMismatch' | 'NoAccessTokenFound'; // IgnoredFailedCodes contains the failreCode that don't need to send error report @@ -74,28 +120,22 @@ interface SSHUserFlowTelemetry extends UserFlowTelemetryProperties { class WebSocketSSHProxy { private extensionIpc: Client; - private flow: SSHUserFlowTelemetry; constructor( private readonly options: ClientOptions, private readonly telemetryService: ITelemetryService, private readonly metricsReporter: LocalSSHMetricsReporter, - private readonly logService: ILogService + private readonly logService: ILogService, + private readonly flow: SSHUserFlowTelemetry ) { - this.flow = { - flow: 'local_ssh', - gitpodHost: this.options.host, - workspaceId: '', - }; - this.onExit(); this.onException(); this.extensionIpc = createClient(ExtensionServiceDefinition, createChannel('127.0.0.1:' + this.options.extIpcPort)); } private onExit() { - const exitHandler = (_signal?: NodeJS.Signals) => { - process.exit(0); + const exitHandler = (signal?: NodeJS.Signals) => { + exitProcess(false, signal) }; process.on('SIGINT', exitHandler); process.on('SIGTERM', exitHandler); @@ -116,19 +156,21 @@ class WebSocketSSHProxy { // an error handler to the writable stream const sshStream = stream.Duplex.from({ readable: process.stdin, writable: process.stdout }); sshStream.on('error', e => { - if ((e as any).code === 'EPIPE') { - // HACK: - // Seems there's a bug in the ssh library that could hang forever when the stream gets closed - // so the below `await pipePromise` will never return and the node process will never exit. - // So let's just force kill here - setTimeout(() => process.exit(0), 50); + if ((e as any).code !== 'EPIPE') { + // TODO filter out known error codes + this.logService.error(e, 'unexpected sshStream error'); } + // HACK: + // Seems there's a bug in the ssh library that could hang forever when the stream gets closed + // so the below `await pipePromise` will never return and the node process will never exit. + // So let's just force kill here + setTimeout(() => exitProcess(true), 50); }); // sshStream.on('end', () => { - // setTimeout(() => process.exit(0), 50); + // setTimeout(() => doProcessExit(0), 50); // }); // sshStream.on('close', () => { - // setTimeout(() => process.exit(0), 50); + // setTimeout(() => doProcessExit(0), 50); // }); // This is expected to never throw as key is hardcoded @@ -227,10 +269,46 @@ class WebSocketSSHProxy { 'x-gitpod-owner-token': workspaceInfo.ownerToken } }); + socket.binaryType = 'arraybuffer'; const stream = await new Promise((resolve, reject) => { - socket.onopen = () => resolve(new WebSocketStream(socket as any)); + socket.onopen = () => { + // see https://github.com/gitpod-io/gitpod/blob/a5b4a66e0f384733145855f82f77332062e9d163/components/gitpod-protocol/go/websocket.go#L31-L40 + const pongPeriod = 15 * 1000; + const pingPeriod = pongPeriod * 9 / 10; + + let pingTimeout: NodeJS.Timeout | undefined; + const heartbeat = () => { + stopHearbeat(); + + // Use `WebSocket#terminate()`, which immediately destroys the connection, + // instead of `WebSocket#close()`, which waits for the close timer. + // Delay should be equal to the interval at which your server + // sends out pings plus a conservative assumption of the latency. + pingTimeout = setTimeout(() => { + // TODO(ak) if we see stale socket.terminate(); + this.telemetryService.sendUserFlowStatus('stale', this.flow); + }, pingPeriod + 1000); + } + function stopHearbeat() { + if (pingTimeout != undefined) { + clearTimeout(pingTimeout); + pingTimeout = undefined; + } + } + + socket.on('ping', heartbeat); + + heartbeat(); + const socketWrapper = new WebSocketStream(socket as any); + const wrappedOnClose = socket.onclose!; + socket.onclose = (e) => { + stopHearbeat(); + wrappedOnClose(e); + } + resolve(socketWrapper); + } socket.onerror = (e) => reject(e); }); @@ -281,30 +359,9 @@ class WebSocketSSHProxy { } } -const options = getClientOptions(); -if (!options) { - process.exit(1); -} - -import { NopeLogger } from './logger'; -const logService = new NopeLogger(); - -// DO NOT PUSH CHANGES BELOW TO PRODUCTION -// import { DebugLogger } from './logger'; -// const logService = new DebugLogger(); - -const telemetryService = new TelemetryService( - process.env.SEGMENT_KEY!, - options.machineID, - process.env.EXT_NAME!, - process.env.EXT_VERSION!, - options.host, - logService -); - const metricsReporter = new LocalSSHMetricsReporter(logService); -const proxy = new WebSocketSSHProxy(options, telemetryService, metricsReporter, logService); -proxy.start().catch(() => { - // Noop, catch everything in start method pls +const proxy = new WebSocketSSHProxy(options, telemetryService, metricsReporter, logService, flow); +proxy.start().catch(e => { + telemetryService.sendTelemetryException(e, { gitpodHost: options.host }); }); diff --git a/src/local-ssh/telemetryService.ts b/src/local-ssh/telemetryService.ts index 4205490d..d29446b3 100644 --- a/src/local-ssh/telemetryService.ts +++ b/src/local-ssh/telemetryService.ts @@ -31,14 +31,14 @@ export class TelemetryService implements ITelemetryService { this.commonProperties = commonProperties; } - sendEventData(eventName: string, data?: Record) { + async sendEventData(eventName: string, data?: Record): Promise { const properties = mixin(cleanData(data ?? {}, this.cleanupPatterns, isTrustedValue), this.commonProperties); if (!this.segmentClient) { return; } - commonSendEventData(this.logService, this.segmentClient, this.machineId, eventName, properties); + return commonSendEventData(this.logService, this.segmentClient, this.machineId, eventName, properties); } sendErrorData(error: Error, data?: Record) { @@ -53,9 +53,9 @@ export class TelemetryService implements ITelemetryService { // Noop, we disabled buffering } - sendTelemetryEvent(eventName: string, properties?: TelemetryEventProperties): void { + sendTelemetryEvent(eventName: string, properties?: TelemetryEventProperties): Promise { const props = properties ? Object.fromEntries(Object.entries(properties).map(([k, v]) => [k, TRUSTED_VALUES.has(k) ? new TelemetryTrustedValue(v) : v])) : undefined; - this.sendEventData(eventName, props); + return this.sendEventData(eventName, props); } sendTelemetryException(error: Error, properties?: TelemetryEventProperties): void { @@ -63,10 +63,10 @@ export class TelemetryService implements ITelemetryService { this.sendErrorData(error, props); } - sendUserFlowStatus(status: string, flowProperties: UserFlowTelemetryProperties): void { + sendUserFlowStatus(status: string, flowProperties: UserFlowTelemetryProperties): Promise { const properties: TelemetryEventProperties = { ...flowProperties, status }; delete properties['flow']; - this.sendTelemetryEvent('vscode_desktop_' + flowProperties.flow, properties); + return this.sendTelemetryEvent('vscode_desktop_' + flowProperties.flow, properties); } }