Skip to content

Commit

Permalink
[local ssh] improve obsevability and try to prevent hanging
Browse files Browse the repository at this point in the history
- add started/existed status reporting to identify hanging process
- don't swallow errors, but report all, we can sort them out later to investigate root causes of hanging
- add detection of stale web sockets to prevent hanging
  • Loading branch information
akosyakov committed Jul 24, 2023
1 parent 0a75be3 commit 14bc4ba
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 79 deletions.
23 changes: 12 additions & 11 deletions src/common/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export function getErrorMetricsEndpoint(gitpodHost: string): string {
return `https://ide.${serviceUrl.hostname}/metrics-api/reportError`;
}

export function commonSendEventData(logService: ILogService, segmentClient: Analytics, machineId: string, eventName: string, data?: any) {
export async function commonSendEventData(logService: ILogService, segmentClient: Analytics, machineId: string, eventName: string, data?: any): Promise<void> {
const properties = data ?? {};

delete properties['gitpodHost'];
Expand All @@ -77,16 +77,17 @@ export function commonSendEventData(logService: ILogService, segmentClient: Anal
logService.trace('Local event report', eventName, properties);
return;
}

segmentClient.track({
anonymousId: machineId,
event: eventName,
properties
}, (err) => {
if (err) {
logService.error('Failed to log event to app analytics:', err);
}
});
return new Promise((resolve) =>
segmentClient.track({
anonymousId: machineId,
event: eventName,
properties
}, (err) => {
if (err) {
logService.error('Failed to log event to app analytics:', err);
}
resolve();
}))
}

interface SendErrorDataOptions {
Expand Down
181 changes: 119 additions & 62 deletions src/local-ssh/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,71 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

interface ClientOptions {
host: string;
extIpcPort: number;
machineID: string;
}

function getClientOptions(): ClientOptions {
const args = process.argv.slice(2);
// %h is in the form of <ws_id>.vss.<gitpod_host>'
// add `https://` prefix since our gitpodHost is actually a url not host
const host = 'https://' + args[0].split('.').splice(2).join('.');
return {
host,
extIpcPort: Number.parseInt(args[1], 10),
machineID: args[2] ?? '',
};
}

const options = getClientOptions();
if (!options) {
process.exit(1);
}

import { NopeLogger } from './logger';
const logService = new NopeLogger();

// DO NOT PUSH CHANGES BELOW TO PRODUCTION
// import { DebugLogger } from './logger';
// const logService = new DebugLogger();

import { TelemetryService } from './telemetryService';
const telemetryService = new TelemetryService(
process.env.SEGMENT_KEY!,
options.machineID,
process.env.EXT_NAME!,
process.env.EXT_VERSION!,
options.host,
logService
);

const flow: SSHUserFlowTelemetry = {
flow: 'local_ssh',
gitpodHost: options.host,
workspaceId: '',
processId: process.pid,
};

telemetryService.sendUserFlowStatus('started', flow)
const sendExited = (exitCode: number, forceExit: boolean, exitSignal?: NodeJS.Signals) => telemetryService.sendUserFlowStatus('exited', {
...flow,
exitCode,
forceExit: String(forceExit),
signal: exitSignal
})
// best effort to intercept process exit
const beforeExitListener = (exitCode: number) => {
process.removeListener('beforeExit', beforeExitListener);
return sendExited(exitCode, false)
}
process.addListener('beforeExit', beforeExitListener);
const exitProcess = async (forceExit: boolean, signal?: NodeJS.Signals) => {
await sendExited(0, forceExit, signal);
process.exit(0);
}

import { SshClient } from '@microsoft/dev-tunnels-ssh-tcp';
import { NodeStream, SshClientCredentials, SshClientSession, SshDisconnectReason, SshServerSession, SshSessionConfiguration, Stream, WebSocketStream } from '@microsoft/dev-tunnels-ssh';
import { importKey, importKeyBytes } from '@microsoft/dev-tunnels-ssh-keys';
Expand All @@ -13,7 +78,6 @@ import { WrapError } from '../common/utils';
import { WebSocket } from 'ws';
import * as stream from 'stream';
import { ILogService } from '../services/logService';
import { TelemetryService } from './telemetryService';
import { ITelemetryService, UserFlowTelemetryProperties } from '../common/telemetry';
import { LocalSSHMetricsReporter } from '../services/localSSHMetrics';

Expand All @@ -25,24 +89,6 @@ function getHostKey(): Buffer {
return Buffer.from(HOST_KEY, 'base64');
}

interface ClientOptions {
host: string;
extIpcPort: number;
machineID: string;
}

function getClientOptions(): ClientOptions {
const args = process.argv.slice(2);
// %h is in the form of <ws_id>.vss.<gitpod_host>'
// add `https://` prefix since our gitpodHost is actually a url not host
const host = 'https://' + args[0].split('.').splice(2).join('.');
return {
host,
extIpcPort: Number.parseInt(args[1], 10),
machineID: args[2] ?? '',
};
}

type FailedToProxyCode = 'SSH.AuthenticationFailed' | 'TUNNEL.AuthenticateSSHKeyFailed' | 'NoRunningInstance' | 'FailedToGetAuthInfo' | 'GitpodHostMismatch' | 'NoAccessTokenFound';

// IgnoredFailedCodes contains the failreCode that don't need to send error report
Expand Down Expand Up @@ -74,28 +120,22 @@ interface SSHUserFlowTelemetry extends UserFlowTelemetryProperties {

class WebSocketSSHProxy {
private extensionIpc: Client<ExtensionServiceDefinition>;
private flow: SSHUserFlowTelemetry;

constructor(
private readonly options: ClientOptions,
private readonly telemetryService: ITelemetryService,
private readonly metricsReporter: LocalSSHMetricsReporter,
private readonly logService: ILogService
private readonly logService: ILogService,
private readonly flow: SSHUserFlowTelemetry
) {
this.flow = {
flow: 'local_ssh',
gitpodHost: this.options.host,
workspaceId: '',
};

this.onExit();
this.onException();
this.extensionIpc = createClient(ExtensionServiceDefinition, createChannel('127.0.0.1:' + this.options.extIpcPort));
}

private onExit() {
const exitHandler = (_signal?: NodeJS.Signals) => {
process.exit(0);
const exitHandler = (signal?: NodeJS.Signals) => {
exitProcess(false, signal)
};
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);
Expand All @@ -116,19 +156,21 @@ class WebSocketSSHProxy {
// an error handler to the writable stream
const sshStream = stream.Duplex.from({ readable: process.stdin, writable: process.stdout });
sshStream.on('error', e => {
if ((e as any).code === 'EPIPE') {
// HACK:
// Seems there's a bug in the ssh library that could hang forever when the stream gets closed
// so the below `await pipePromise` will never return and the node process will never exit.
// So let's just force kill here
setTimeout(() => process.exit(0), 50);
if ((e as any).code !== 'EPIPE') {
// TODO filter out known error codes
this.logService.error(e, 'unexpected sshStream error');
}
// HACK:
// Seems there's a bug in the ssh library that could hang forever when the stream gets closed
// so the below `await pipePromise` will never return and the node process will never exit.
// So let's just force kill here
setTimeout(() => exitProcess(true), 50);
});
// sshStream.on('end', () => {
// setTimeout(() => process.exit(0), 50);
// setTimeout(() => doProcessExit(0), 50);
// });
// sshStream.on('close', () => {
// setTimeout(() => process.exit(0), 50);
// setTimeout(() => doProcessExit(0), 50);
// });

// This is expected to never throw as key is hardcoded
Expand Down Expand Up @@ -227,10 +269,46 @@ class WebSocketSSHProxy {
'x-gitpod-owner-token': workspaceInfo.ownerToken
}
});

socket.binaryType = 'arraybuffer';

const stream = await new Promise<Stream>((resolve, reject) => {
socket.onopen = () => resolve(new WebSocketStream(socket as any));
socket.onopen = () => {
// see https://github.com/gitpod-io/gitpod/blob/a5b4a66e0f384733145855f82f77332062e9d163/components/gitpod-protocol/go/websocket.go#L31-L40
const pongPeriod = 15 * 1000;
const pingPeriod = pongPeriod * 9 / 10;

let pingTimeout: NodeJS.Timeout | undefined;
const heartbeat = () => {
stopHearbeat();

// Use `WebSocket#terminate()`, which immediately destroys the connection,
// instead of `WebSocket#close()`, which waits for the close timer.
// Delay should be equal to the interval at which your server
// sends out pings plus a conservative assumption of the latency.
pingTimeout = setTimeout(() => {
// TODO(ak) if we see stale socket.terminate();
this.telemetryService.sendUserFlowStatus('stale', this.flow);
}, pingPeriod + 1000);
}
function stopHearbeat() {
if (pingTimeout != undefined) {
clearTimeout(pingTimeout);
pingTimeout = undefined;
}
}

socket.on('ping', heartbeat);

heartbeat();
const socketWrapper = new WebSocketStream(socket as any);
const wrappedOnClose = socket.onclose!;
socket.onclose = (e) => {
stopHearbeat();
wrappedOnClose(e);
}
resolve(socketWrapper);
}
socket.onerror = (e) => reject(e);
});

Expand Down Expand Up @@ -281,30 +359,9 @@ class WebSocketSSHProxy {
}
}

const options = getClientOptions();
if (!options) {
process.exit(1);
}

import { NopeLogger } from './logger';
const logService = new NopeLogger();

// DO NOT PUSH CHANGES BELOW TO PRODUCTION
// import { DebugLogger } from './logger';
// const logService = new DebugLogger();

const telemetryService = new TelemetryService(
process.env.SEGMENT_KEY!,
options.machineID,
process.env.EXT_NAME!,
process.env.EXT_VERSION!,
options.host,
logService
);

const metricsReporter = new LocalSSHMetricsReporter(logService);

const proxy = new WebSocketSSHProxy(options, telemetryService, metricsReporter, logService);
proxy.start().catch(() => {
// Noop, catch everything in start method pls
const proxy = new WebSocketSSHProxy(options, telemetryService, metricsReporter, logService, flow);
proxy.start().catch(e => {
telemetryService.sendTelemetryException(e, { gitpodHost: options.host });
});
12 changes: 6 additions & 6 deletions src/local-ssh/telemetryService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ export class TelemetryService implements ITelemetryService {
this.commonProperties = commonProperties;
}

sendEventData(eventName: string, data?: Record<string, any>) {
async sendEventData(eventName: string, data?: Record<string, any>): Promise<void> {
const properties = mixin(cleanData(data ?? {}, this.cleanupPatterns, isTrustedValue), this.commonProperties);

if (!this.segmentClient) {
return;
}

commonSendEventData(this.logService, this.segmentClient, this.machineId, eventName, properties);
return commonSendEventData(this.logService, this.segmentClient, this.machineId, eventName, properties);
}

sendErrorData(error: Error, data?: Record<string, any>) {
Expand All @@ -53,20 +53,20 @@ export class TelemetryService implements ITelemetryService {
// Noop, we disabled buffering
}

sendTelemetryEvent(eventName: string, properties?: TelemetryEventProperties): void {
sendTelemetryEvent(eventName: string, properties?: TelemetryEventProperties): Promise<void> {
const props = properties ? Object.fromEntries(Object.entries(properties).map(([k, v]) => [k, TRUSTED_VALUES.has(k) ? new TelemetryTrustedValue(v) : v])) : undefined;
this.sendEventData(eventName, props);
return this.sendEventData(eventName, props);
}

sendTelemetryException(error: Error, properties?: TelemetryEventProperties): void {
const props = properties ? Object.fromEntries(Object.entries(properties).map(([k, v]) => [k, TRUSTED_VALUES.has(k) ? new TelemetryTrustedValue(v) : v])) : undefined;
this.sendErrorData(error, props);
}

sendUserFlowStatus(status: string, flowProperties: UserFlowTelemetryProperties): void {
sendUserFlowStatus(status: string, flowProperties: UserFlowTelemetryProperties): Promise<void> {
const properties: TelemetryEventProperties = { ...flowProperties, status };
delete properties['flow'];
this.sendTelemetryEvent('vscode_desktop_' + flowProperties.flow, properties);
return this.sendTelemetryEvent('vscode_desktop_' + flowProperties.flow, properties);
}
}

Expand Down

0 comments on commit 14bc4ba

Please sign in to comment.