Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sass --embedded performance #2013

Merged
merged 15 commits into from
Sep 1, 2023
Merged
219 changes: 118 additions & 101 deletions lib/src/embedded/dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';

import 'package:native_synchronization/mailbox.dart';
import 'package:path/path.dart' as p;
import 'package:protobuf/protobuf.dart';
import 'package:sass/sass.dart' as sass;
import 'package:stream_channel/stream_channel.dart';

import 'embedded_sass.pb.dart';
import 'function_registry.dart';
Expand All @@ -30,83 +31,61 @@ final _outboundRequestId = 0;
/// A class that dispatches messages to and from the host for a single
/// compilation.
final class Dispatcher {
/// The channel of encoded protocol buffers, connected to the host.
final StreamChannel<Uint8List> _channel;
/// The mailbox for receiving messages from the host.
final Mailbox _mailbox;

/// The sink for sending messages to the host.
final SendPort _sendPort;

/// The compilation ID for which this dispatcher is running.
///
/// This is added to outgoing messages but is _not_ parsed from incoming
/// messages, since that's already handled by the [IsolateDispatcher].
final int _compilationId;
/// This is used in error messages.
late int _compilationId;

/// [_compilationId], serialized as a varint.
final Uint8List _compilationIdVarint;

/// Whether this dispatcher has received its compile request.
var _compiling = false;

/// A completer awaiting a response to an outbound request.
///
/// Since each [Dispatcher] is only running a single-threaded compilation, it
/// can only ever have one request outstanding.
Completer<GeneratedMessage>? _outstandingRequest;
/// This is used in outgoing messages.
late Uint8List _compilationIdVarint;

/// Creates a [Dispatcher] that sends and receives encoded protocol buffers
/// over [channel].
Dispatcher(this._channel, this._compilationId)
: _compilationIdVarint = serializeVarint(_compilationId);
/// Creates a [Dispatcher] that receives encoded protocol buffers through
/// [_mailbox] and sends them through [_sendPort].
Dispatcher(this._mailbox, this._sendPort);

/// Listens for incoming `CompileRequests` and runs their compilations.
///
/// This may only be called once. Returns whether or not the compilation
/// succeeded.
Future<bool> listen() async {
var success = false;
await _channel.stream.listen((binaryMessage) async {
// Wait a single microtask tick so that we're running in a separate
// microtask from the initial request dispatch. Otherwise, [waitFor] will
// deadlock the event loop fiber that would otherwise be checking stdin
// for new input.
await Future<void>.value();
void listen() {
while (true) {
var packet = _mailbox.take();
if (packet.isEmpty) break;

try {
InboundMessage? message;
var (compilationId, messageBuffer) = parsePacket(packet);

_compilationId = compilationId;
_compilationIdVarint = serializeVarint(compilationId);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(binaryMessage);
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

switch (message.whichMessage()) {
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.compileRequest:
if (_compiling) {
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is "
"already active.");
}
_compiling = true;

var request = message.compileRequest;
var response = await _compile(request);
var response = _compile(request);
_send(OutboundMessage()..compileResponse = response);
success = true;
// Each Dispatcher runs a single compilation and then closes.
_channel.sink.close();

case InboundMessage_Message.canonicalizeResponse:
_dispatchResponse(message.id, message.canonicalizeResponse);
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.canonicalizeResponse:
case InboundMessage_Message.importResponse:
_dispatchResponse(message.id, message.importResponse);

case InboundMessage_Message.fileImportResponse:
_dispatchResponse(message.id, message.fileImportResponse);

case InboundMessage_Message.functionCallResponse:
_dispatchResponse(message.id, message.functionCallResponse);
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests"
" in compilation $_compilationId.");

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");
Expand All @@ -115,16 +94,20 @@ final class Dispatcher {
throw parseError(
"Unknown message type: ${message.toDebugString()}");
}
} on ProtocolError catch (error, stackTrace) {
sendError(handleError(error, stackTrace));
_channel.sink.close();
} on AsyncError catch (error) {
if (error.error is ProtocolError) {
_handleError(error.error, error.stackTrace);
}
break;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
break;
}
}).asFuture<void>();
return success;
}
}

Future<OutboundMessage_CompileResponse> _compile(
InboundMessage_CompileRequest request) async {
OutboundMessage_CompileResponse _compile(
InboundMessage_CompileRequest request) {
var functions = FunctionRegistry();

var style = request.style == OutputStyle.COMPRESSED
Expand Down Expand Up @@ -159,7 +142,6 @@ final class Dispatcher {
verbose: request.verbose,
sourceMap: request.sourceMap,
charset: request.charset);
break;

case InboundMessage_CompileRequest_Input.path:
if (request.path.isEmpty) {
Expand Down Expand Up @@ -188,7 +170,6 @@ final class Dispatcher {
..end = SourceSpan_SourceLocation()
..url = p.toUri(request.path).toString()));
}
break;

case InboundMessage_CompileRequest_Input.notSet:
throw mandatoryError("CompileRequest.input");
Expand Down Expand Up @@ -245,59 +226,93 @@ final class Dispatcher {
void sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

Future<InboundMessage_CanonicalizeResponse> sendCanonicalizeRequest(
InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
OutboundMessage()..canonicalizeRequest = request);

Future<InboundMessage_ImportResponse> sendImportRequest(
InboundMessage_ImportResponse sendImportRequest(
OutboundMessage_ImportRequest request) =>
_sendRequest<InboundMessage_ImportResponse>(
OutboundMessage()..importRequest = request);

Future<InboundMessage_FileImportResponse> sendFileImportRequest(
InboundMessage_FileImportResponse sendFileImportRequest(
OutboundMessage_FileImportRequest request) =>
_sendRequest<InboundMessage_FileImportResponse>(
OutboundMessage()..fileImportRequest = request);

Future<InboundMessage_FunctionCallResponse> sendFunctionCallRequest(
InboundMessage_FunctionCallResponse sendFunctionCallRequest(
OutboundMessage_FunctionCallRequest request) =>
_sendRequest<InboundMessage_FunctionCallResponse>(
OutboundMessage()..functionCallRequest = request);

/// Sends [request] to the host and returns the message sent in response.
Future<T> _sendRequest<T extends GeneratedMessage>(
OutboundMessage request) async {
request.id = _outboundRequestId;
_send(request);

if (_outstandingRequest != null) {
throw StateError(
"Dispatcher.sendRequest() can't be called when another request is "
"active.");
}
T _sendRequest<T extends GeneratedMessage>(OutboundMessage message) {
message.id = _outboundRequestId;
_send(message);

return (_outstandingRequest = Completer<T>()).future;
}
var packet = _mailbox.take();

/// Dispatches [response] to the appropriate outstanding request.
///
/// Throws an error if there's no outstanding request with the given [id] or
/// if that request is expecting a different type of response.
void _dispatchResponse<T extends GeneratedMessage>(int? id, T response) {
var completer = _outstandingRequest;
_outstandingRequest = null;
if (completer == null || id != _outboundRequestId) {
throw paramsError(
"Response ID $id doesn't match any outstanding requests in "
"compilation $_compilationId.");
} else if (completer is! Completer<T>) {
throw paramsError(
"Request ID $id doesn't match response type ${response.runtimeType} "
"in compilation $_compilationId.");
try {
var messageBuffer =
Uint8List.sublistView(packet, _compilationIdVarint.length);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

GeneratedMessage response;
switch (message.whichMessage()) {
case InboundMessage_Message.canonicalizeResponse:
response = message.canonicalizeResponse;

case InboundMessage_Message.importResponse:
response = message.importResponse;

case InboundMessage_Message.fileImportResponse:
response = message.fileImportResponse;

case InboundMessage_Message.functionCallResponse:
response = message.functionCallResponse;

case InboundMessage_Message.compileRequest:
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is already active.");

case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");

default:
throw parseError("Unknown message type: ${message.toDebugString()}");
}
if (message.id != _outboundRequestId) {
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests in"
" compilation $_compilationId.");
}
if (response is! T) {
throw paramsError(
"Request ID $_outboundRequestId doesn't match response type"
" ${response.runtimeType} in compilation $_compilationId.");
}
return response;
} catch (error, stackTrace) {
throw AsyncError(error, stackTrace);
}
}

completer.complete(response);
/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if
/// available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
}

/// Sends [message] to the host with the given [wireId].
Expand All @@ -306,16 +321,18 @@ final class Dispatcher {
message.writeToCodedBufferWriter(protobufWriter);

// Add one additional byte to the beginning to indicate whether or not the
// compilation is finished, so the [IsolateDispatcher] knows whether to
// treat this isolate as inactive.
// compilation has finished (1) or encountered a fatal error (2), so the
// [IsolateDispatcher] knows whether to treat this isolate as inactive or
// close out entirely.
var packet = Uint8List(
1 + _compilationIdVarint.length + protobufWriter.lengthInBytes);
packet[0] =
message.whichMessage() == OutboundMessage_Message.compileResponse
? 1
: 0;
packet[0] = switch (message.whichMessage()) {
OutboundMessage_Message.compileResponse => 1,
OutboundMessage_Message.error => 2,
_ => 0
};
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_channel.sink.add(packet);
_sendPort.send(packet);
}
}
9 changes: 3 additions & 6 deletions lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// ignore: deprecated_member_use
import 'dart:cli';
import 'dart:async';

import '../callable.dart';
import '../exception.dart';
Expand Down Expand Up @@ -37,8 +36,7 @@ Callable hostCallable(
request.name = callable.name;
}

// ignore: deprecated_member_use
var response = waitFor(dispatcher.sendFunctionCallRequest(request));
var response = dispatcher.sendFunctionCallRequest(request);
try {
switch (response.whichResult()) {
case InboundMessage_FunctionCallResponse_Result.success:
Expand All @@ -51,8 +49,7 @@ Callable hostCallable(
throw mandatoryError('FunctionCallResponse.result');
}
} on ProtocolError catch (error, stackTrace) {
dispatcher.sendError(handleError(error, stackTrace));
throw error.message;
throw AsyncError(error, stackTrace);
}
});
return callable;
Expand Down
Loading