Skip to content

Commit

Permalink
Send postlines via websocket if we can (#1730)
Browse files Browse the repository at this point in the history
* feed via websocket

* feed via websocket

* feedback

* ensure right schema is used

* fix resiliency

* some fixes

* fix sending message

* chunk data

* let's abort, which will also dispose

* close gracefully
  • Loading branch information
yaananth committed Mar 15, 2022
1 parent a0458ae commit ddc700e
Showing 1 changed file with 128 additions and 8 deletions.
136 changes: 128 additions & 8 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Newtonsoft.Json;
using Pipelines = GitHub.DistributedTask.Pipelines;

namespace GitHub.Runner.Common
Expand All @@ -30,6 +34,11 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(1);
private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500);

private static readonly int _minWebsocketFailurePercentageAllowed = 50;
private static readonly int _minWebsocketBatchedLinesCountToConsider = 5;

// Job message information
private Guid _scopeIdentifier;
Expand Down Expand Up @@ -58,6 +67,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private Task _fileUploadDequeueTask;
private Task _timelineUpdateDequeueTask;

private Task _websocketConnectTask = null;

// common
private IJobServer _jobServer;
private Task[] _allDequeueTasks;
Expand All @@ -71,14 +82,21 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue

// Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds).
// Then the dequeue will happen every 500ms.
// In this way, customer still can get instance live console output on job start,
// In this way, customer still can get instance live console output on job start,
// at the same time we can cut the load to server after the build run for more than 60s
private int _webConsoleLineAggressiveDequeueCount = 0;
private const int _webConsoleLineAggressiveDequeueLimit = 4 * 60;
private const int _webConsoleLineQueueSizeLimit = 1024;
private bool _webConsoleLineAggressiveDequeue = true;
private bool _firstConsoleOutputs = true;

private int totalBatchedLinesAttemptedByWebsocket = 0;
private int failedAttemptsToPostBatchedLinesByWebsocket = 0;

private ClientWebSocket _websocketClient = null;

private ServiceEndpoint _serviceEndPoint;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
Expand All @@ -89,6 +107,10 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
{
Trace.Entering();

this._serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));

InitializeWebsocket();

if (_queueInProcess)
{
Trace.Info("No-opt, all queue process tasks are running.");
Expand Down Expand Up @@ -156,6 +178,9 @@ public async Task ShutdownAsync()
await ProcessTimelinesUpdateQueueAsync(runOnce: true);
Trace.Info("Timeline update queue drained.");

Trace.Info($"Disposing websocket client ...");
this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None);

Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
}

Expand Down Expand Up @@ -292,14 +317,69 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
{
try
{
// we will not requeue failed batch, since the web console lines are time sensitive.
if (batch[0].LineNumber.HasValue)
if (this._websocketConnectTask != null)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
// lazily await here, we are already in the background task here
await this._websocketConnectTask;
}
else

var pushedLinesViaWebsocket = false;
if (this._websocketClient != null)
{
var linesWrapper = batch[0].LineNumber.HasValue? new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value):
new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList());
var jsonData = StringUtil.ConvertToJson(linesWrapper);
try
{
totalBatchedLinesAttemptedByWebsocket++;
var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData);
// break the message into chunks of 1024 bytes
for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024)
{
var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length;
var chunk = new ArraySegment<byte>(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i));
await this._websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, CancellationToken.None);
}

pushedLinesViaWebsocket = true;
}
catch (Exception ex)
{
Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State}).");
Trace.Error(ex);
failedAttemptsToPostBatchedLinesByWebsocket++;
if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider)
{
// let's consider failure percentage
if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed)
{
Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again.");
this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", CancellationToken.None);
this._websocketClient = null;
}
}

if (this._websocketClient != null)
{
var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect);
Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}).");
InitializeWebsocket(delay);
}
}
}

// if we can't push via websocket, let's fallback to posting via REST API
if (!pushedLinesViaWebsocket)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
// we will not requeue failed batch, since the web console lines are time sensitive.
if (batch[0].LineNumber.HasValue)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
}
else
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
}
}

if (_firstConsoleOutputs)
Expand Down Expand Up @@ -391,6 +471,46 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
}
}

private void InitializeWebsocket(TimeSpan? delay = null)
{
if (_serviceEndPoint.Authorization != null &&
_serviceEndPoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) &&
!string.IsNullOrEmpty(accessToken))
{
if (_serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl))
{
// let's ensure we use the right scheme
feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://");
Trace.Info($"Creating websocket client ..." + feedStreamUrl);
this._websocketClient = new ClientWebSocket();
this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
this._websocketConnectTask = Task.Run(async () =>
{
try
{
Trace.Info($"Attempting to start websocket client with delay {delay}.");
await Task.Delay(delay ?? TimeSpan.Zero);
await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken));
Trace.Info($"Successfully started websocket client.");
}
catch(Exception ex)
{
Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket.");
Trace.Error(ex);
}
});
}
else
{
Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data");
}
}
else
{
Trace.Info($"No access token from the service endpoint");
}
}

private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
while (!_jobCompletionSource.Task.IsCompleted || runOnce)
Expand Down Expand Up @@ -489,8 +609,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)

if (runOnce)
{
// continue process timeline records update,
// we might have more records need update,
// continue process timeline records update,
// we might have more records need update,
// since we just create a new sub-timeline
if (pendingSubtimelineUpdate)
{
Expand Down

0 comments on commit ddc700e

Please sign in to comment.