Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sass --embedded performance #2013

Merged
merged 15 commits into from
Sep 1, 2023
196 changes: 107 additions & 89 deletions lib/src/embedded/dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:native_synchronization/mailbox.dart';
import 'package:path/path.dart' as p;
import 'package:protobuf/protobuf.dart';
import 'package:sass/sass.dart' as sass;
import 'package:stream_channel/stream_channel.dart';

import 'embedded_sass.pb.dart';
import 'function_registry.dart';
Expand All @@ -30,83 +30,65 @@ final _outboundRequestId = 0;
/// A class that dispatches messages to and from the host for a single
/// compilation.
final class Dispatcher {
/// The channel of encoded protocol buffers, connected to the host.
final StreamChannel<Uint8List> _channel;
/// The mailbox for receiving messages from the host.
final Mailbox _mailbox;

/// The sink for sending messages to the host.
final StreamSink<Uint8List> _sink;

/// The compilation ID for which this dispatcher is running.
///
/// This is added to outgoing messages but is _not_ parsed from incoming
ntkme marked this conversation as resolved.
Show resolved Hide resolved
/// messages, since that's already handled by the [IsolateDispatcher].
final int _compilationId;
int _compilationId = 0;
ntkme marked this conversation as resolved.
Show resolved Hide resolved

/// [_compilationId], serialized as a varint.
final Uint8List _compilationIdVarint;

/// Whether this dispatcher has received its compile request.
var _compiling = false;
Uint8List _compilationIdVarint = Uint8List.fromList([0]);

/// A completer awaiting a response to an outbound request.
///
/// Since each [Dispatcher] is only running a single-threaded compilation, it
/// can only ever have one request outstanding.
Completer<GeneratedMessage>? _outstandingRequest;
/// Whether a fatal error has occured during host request.
var _asyncError = false;

/// Creates a [Dispatcher] that sends and receives encoded protocol buffers
/// over [channel].
Dispatcher(this._channel, this._compilationId)
: _compilationIdVarint = serializeVarint(_compilationId);
Dispatcher(this._mailbox, this._sink);

/// Listens for incoming `CompileRequests` and runs their compilations.
///
/// This may only be called once. Returns whether or not the compilation
/// succeeded.
Future<bool> listen() async {
var success = false;
await _channel.stream.listen((binaryMessage) async {
// Wait a single microtask tick so that we're running in a separate
// microtask from the initial request dispatch. Otherwise, [waitFor] will
// deadlock the event loop fiber that would otherwise be checking stdin
// for new input.
await Future<void>.value();
void listen() {
do {
var packet = _mailbox.take();
if (packet.isEmpty) break;

try {
InboundMessage? message;
var (compilationId, messageBuffer) = parsePacket(packet);

_compilationId = compilationId;
_compilationIdVarint = serializeVarint(compilationId);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(binaryMessage);
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

switch (message.whichMessage()) {
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.compileRequest:
if (_compiling) {
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is "
"already active.");
var request = message.compileRequest;
var response = _compile(request);
if (!_asyncError) {
ntkme marked this conversation as resolved.
Show resolved Hide resolved
_send(OutboundMessage()..compileResponse = response);
}
_compiling = true;
break;
ntkme marked this conversation as resolved.
Show resolved Hide resolved

var request = message.compileRequest;
var response = await _compile(request);
_send(OutboundMessage()..compileResponse = response);
success = true;
// Each Dispatcher runs a single compilation and then closes.
_channel.sink.close();
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.canonicalizeResponse:
_dispatchResponse(message.id, message.canonicalizeResponse);

case InboundMessage_Message.importResponse:
_dispatchResponse(message.id, message.importResponse);

case InboundMessage_Message.fileImportResponse:
_dispatchResponse(message.id, message.fileImportResponse);

case InboundMessage_Message.functionCallResponse:
_dispatchResponse(message.id, message.functionCallResponse);
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests in compilation $_compilationId.");
ntkme marked this conversation as resolved.
Show resolved Hide resolved

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");
Expand All @@ -115,16 +97,14 @@ final class Dispatcher {
throw parseError(
"Unknown message type: ${message.toDebugString()}");
}
} on ProtocolError catch (error, stackTrace) {
sendError(handleError(error, stackTrace));
_channel.sink.close();
} catch (error, stackTrace) {
_handleError(error, stackTrace);
}
}).asFuture<void>();
return success;
} while (!_asyncError);
}

Future<OutboundMessage_CompileResponse> _compile(
InboundMessage_CompileRequest request) async {
OutboundMessage_CompileResponse _compile(
InboundMessage_CompileRequest request) {
var functions = FunctionRegistry();

var style = request.style == OutputStyle.COMPRESSED
Expand Down Expand Up @@ -245,59 +225,97 @@ final class Dispatcher {
void sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

Future<InboundMessage_CanonicalizeResponse> sendCanonicalizeRequest(
InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
OutboundMessage()..canonicalizeRequest = request);

Future<InboundMessage_ImportResponse> sendImportRequest(
InboundMessage_ImportResponse sendImportRequest(
OutboundMessage_ImportRequest request) =>
_sendRequest<InboundMessage_ImportResponse>(
OutboundMessage()..importRequest = request);

Future<InboundMessage_FileImportResponse> sendFileImportRequest(
InboundMessage_FileImportResponse sendFileImportRequest(
OutboundMessage_FileImportRequest request) =>
_sendRequest<InboundMessage_FileImportResponse>(
OutboundMessage()..fileImportRequest = request);

Future<InboundMessage_FunctionCallResponse> sendFunctionCallRequest(
InboundMessage_FunctionCallResponse sendFunctionCallRequest(
OutboundMessage_FunctionCallRequest request) =>
_sendRequest<InboundMessage_FunctionCallResponse>(
OutboundMessage()..functionCallRequest = request);

/// Sends [request] to the host and returns the message sent in response.
Future<T> _sendRequest<T extends GeneratedMessage>(
OutboundMessage request) async {
request.id = _outboundRequestId;
_send(request);

if (_outstandingRequest != null) {
throw StateError(
"Dispatcher.sendRequest() can't be called when another request is "
"active.");
}
T _sendRequest<T extends GeneratedMessage>(OutboundMessage message) {
message.id = _outboundRequestId;
_send(message);

return (_outstandingRequest = Completer<T>()).future;
}
var packet = _mailbox.take();

/// Dispatches [response] to the appropriate outstanding request.
///
/// Throws an error if there's no outstanding request with the given [id] or
/// if that request is expecting a different type of response.
void _dispatchResponse<T extends GeneratedMessage>(int? id, T response) {
var completer = _outstandingRequest;
_outstandingRequest = null;
if (completer == null || id != _outboundRequestId) {
throw paramsError(
"Response ID $id doesn't match any outstanding requests in "
"compilation $_compilationId.");
} else if (completer is! Completer<T>) {
throw paramsError(
"Request ID $id doesn't match response type ${response.runtimeType} "
"in compilation $_compilationId.");
try {
var messageBuffer =
Uint8List.sublistView(packet, _compilationIdVarint.length);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

GeneratedMessage response;
switch (message.whichMessage()) {
case InboundMessage_Message.canonicalizeResponse:
response = message.canonicalizeResponse;
break;

case InboundMessage_Message.importResponse:
response = message.importResponse;
break;

case InboundMessage_Message.fileImportResponse:
response = message.fileImportResponse;
break;

case InboundMessage_Message.functionCallResponse:
response = message.functionCallResponse;
break;

case InboundMessage_Message.compileRequest:
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is already active.");

case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");

default:
throw parseError("Unknown message type: ${message.toDebugString()}");
}
if (message.id != _outboundRequestId) {
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests in compilation $_compilationId.");
ntkme marked this conversation as resolved.
Show resolved Hide resolved
}
if (response is! T) {
throw paramsError(
"Request ID $_outboundRequestId doesn't match response type ${response.runtimeType} in compilation $_compilationId.");
}
return response;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
_asyncError = true;
rethrow;
}
}

completer.complete(response);
/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
_sink.close();
}

/// Sends [message] to the host with the given [wireId].
Expand All @@ -316,6 +334,6 @@ final class Dispatcher {
: 0;
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_channel.sink.add(packet);
_sink.add(packet);
}
}
6 changes: 1 addition & 5 deletions lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// ignore: deprecated_member_use
import 'dart:cli';

import '../callable.dart';
import '../exception.dart';
import 'dispatcher.dart';
Expand Down Expand Up @@ -37,8 +34,7 @@ Callable hostCallable(
request.name = callable.name;
}

// ignore: deprecated_member_use
var response = waitFor(dispatcher.sendFunctionCallRequest(request));
var response = dispatcher.sendFunctionCallRequest(request);
try {
switch (response.whichResult()) {
case InboundMessage_FunctionCallResponse_Result.success:
Expand Down
48 changes: 21 additions & 27 deletions lib/src/embedded/importer/file.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// ignore: deprecated_member_use
import 'dart:cli';

import '../../importer.dart';
import '../dispatcher.dart';
import '../embedded_sass.pb.dart' hide SourceSpan;
Expand All @@ -27,30 +24,27 @@ final class FileImporter extends ImporterBase {
Uri? canonicalize(Uri url) {
if (url.scheme == 'file') return _filesystemImporter.canonicalize(url);

// ignore: deprecated_member_use
return waitFor(() async {
var response = await dispatcher
.sendFileImportRequest(OutboundMessage_FileImportRequest()
..importerId = _importerId
..url = url.toString()
..fromImport = fromImport);

switch (response.whichResult()) {
case InboundMessage_FileImportResponse_Result.fileUrl:
var url = parseAbsoluteUrl("The file importer", response.fileUrl);
if (url.scheme != 'file') {
throw 'The file importer must return a file: URL, was "$url"';
}

return _filesystemImporter.canonicalize(url);

case InboundMessage_FileImportResponse_Result.error:
throw response.error;

case InboundMessage_FileImportResponse_Result.notSet:
return null;
}
}());
var response =
dispatcher.sendFileImportRequest(OutboundMessage_FileImportRequest()
..importerId = _importerId
..url = url.toString()
..fromImport = fromImport);

switch (response.whichResult()) {
case InboundMessage_FileImportResponse_Result.fileUrl:
var url = parseAbsoluteUrl("The file importer", response.fileUrl);
if (url.scheme != 'file') {
throw 'The file importer must return a file: URL, was "$url"';
}

return _filesystemImporter.canonicalize(url);

case InboundMessage_FileImportResponse_Result.error:
throw response.error;

case InboundMessage_FileImportResponse_Result.notSet:
return null;
}
}

ImporterResult? load(Uri url) => _filesystemImporter.load(url);
Expand Down
Loading
Loading