Skip to content

Commit

Permalink
Merge pull request #43031 from warunalakshitha/runtime_api_improve
Browse files Browse the repository at this point in the history
[master] Fix runtime invokeAsync API to call function block the calling Thread
  • Loading branch information
warunalakshitha authored Jul 19, 2024
2 parents 41161fa + 0989fe3 commit 47293c1
Show file tree
Hide file tree
Showing 16 changed files with 437 additions and 389 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
import io.ballerina.runtime.internal.errors.ErrorHelper;
import io.ballerina.runtime.internal.launch.LaunchUtils;
import io.ballerina.runtime.internal.scheduling.AsyncUtils;
import io.ballerina.runtime.internal.scheduling.RuntimeRegistry;
import io.ballerina.runtime.internal.scheduling.Scheduler;
import io.ballerina.runtime.internal.scheduling.Strand;
import io.ballerina.runtime.internal.util.RuntimeUtils;
import io.ballerina.runtime.internal.scheduling.SyncCallback;
import io.ballerina.runtime.internal.values.FutureValue;
import io.ballerina.runtime.internal.values.ObjectValue;
import io.ballerina.runtime.internal.values.ValueCreator;
Expand All @@ -50,12 +51,14 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

import static io.ballerina.identifier.Utils.encodeNonFunctionIdentifier;
import static io.ballerina.runtime.api.constants.RuntimeConstants.ANON_ORG;
import static io.ballerina.runtime.api.constants.RuntimeConstants.CONFIGURATION_CLASS_NAME;
import static io.ballerina.runtime.api.constants.RuntimeConstants.DOT;
import static io.ballerina.runtime.api.constants.RuntimeConstants.MODULE_INIT_CLASS_NAME;

/**
* Internal implementation of the API used by the interop users to control Ballerina runtime behavior.
Expand All @@ -67,59 +70,78 @@ public class BalRuntime extends Runtime {
private final Scheduler scheduler;
private final Module module;
private boolean moduleInitialized = false;
private boolean moduleStarted = false;
private boolean moduleStopped = false;
private Thread schedulerThread = null;

public BalRuntime(Scheduler scheduler, Module module) {
this.scheduler = scheduler;
this.module = module;
}

public BalRuntime(Module module) {
this.scheduler = new Scheduler(false);
this.scheduler = new Scheduler(true);
this.module = module;
}

@Override
public void init() {
invokeConfigInit();
invokeMethodAsync("$moduleInit", null, PredefinedTypes.TYPE_NULL, "init", new Object[1]);
moduleInitialized = true;
if (moduleInitialized) {
throw ErrorHelper.getRuntimeException(ErrorCodes.FUNCTION_ALREADY_CALLED, "init");
}
try {
invokeConfigInit();
schedulerThread = new Thread(scheduler::start);
schedulerThread.start();
invokeMethodSync("$moduleInit");
moduleInitialized = true;
} catch (ClassNotFoundException e) {
throw ErrorCreator.createError(StringUtils.fromString(String.format("module '%s' does not exist", module)));
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
throw ErrorCreator.createError(StringUtils.fromString("error occurred while initializing the ballerina " +
"module due to " + e.getMessage()), e);
}
}

@Override
public void start() {
if (!moduleInitialized) {
throw ErrorHelper.getRuntimeException(ErrorCodes.INVALID_METHOD_CALL, "start");
throw ErrorHelper.getRuntimeException(ErrorCodes.INVALID_FUNCTION_INVOCATION_BEFORE_MODULE_INIT, "start");
}
if (moduleStarted) {
throw ErrorHelper.getRuntimeException(ErrorCodes.FUNCTION_ALREADY_CALLED, "start");
}
invokeMethodAsync("$moduleStart", null, PredefinedTypes.TYPE_NULL, "start", new Object[1]);
invokeMethodSync("$moduleStart");
moduleStarted = true;
}

@Override
public void invokeMethodAsync(String functionName, Callback callback, Object... args) {
if (!moduleInitialized) {
throw ErrorHelper.getRuntimeException(ErrorCodes.INVALID_FUNCTION_INVOCATION, functionName);
throw ErrorHelper.getRuntimeException(ErrorCodes.INVALID_FUNCTION_INVOCATION_BEFORE_MODULE_INIT,
functionName);
}
invokeMethodAsync(functionName, callback, PredefinedTypes.TYPE_ANY, functionName, args);
invokeMethod(functionName, callback, PredefinedTypes.TYPE_ANY, functionName, args);
}

@Override
public void stop() {
if (!moduleInitialized) {
throw ErrorHelper.getRuntimeException(ErrorCodes.INVALID_METHOD_CALL, "stop");
throw ErrorHelper.getRuntimeException(ErrorCodes.INVALID_FUNCTION_INVOCATION_BEFORE_MODULE_INIT, "stop");
}
if (moduleStopped) {
throw ErrorHelper.getRuntimeException(ErrorCodes.FUNCTION_ALREADY_CALLED, "stop");
}
try {
scheduler.poison();
schedulerThread.join();
invokeModuleStop();
moduleStopped = true;
} catch (InterruptedException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
throw ErrorCreator.createError(StringUtils.fromString("error occurred during module stop due to " +
e.getMessage()), e);
}
invokeMethodAsync("$moduleStop", null, PredefinedTypes.TYPE_NULL, "stop", new Scheduler(false), null);
}

private void invokeMethodAsync(String functionName, Callback callback, Type returnType, String strandName,
Object... args) {
ValueCreator valueCreator = ValueCreator.getValueCreator(ValueCreator.getLookupKey(module.getOrg(),
module.getName(), module.getMajorVersion(), module.isTestPkg()));
Function<?, ?> func = o -> valueCreator.call((Strand) (((Object[]) o)[0]), functionName, args);
FutureValue future = scheduler.createFuture(null, callback, null, returnType, strandName, null);
Object[] argsWithStrand = new Object[args.length + 1];
argsWithStrand[0] = future.strand;
System.arraycopy(args, 0, argsWithStrand, 1, args.length);
scheduler.schedule(argsWithStrand, func, future);
scheduler.start();
}

/**
Expand Down Expand Up @@ -156,6 +178,7 @@ public void notifySuccess(Object result) {
Function<?, ?> func = getFunction((Object[]) result, objectVal, methodName);
scheduler.scheduleToObjectGroup(new Object[1], func, future);
}

@Override
public void notifyFailure(BError error) {
callback.notifyFailure(error);
Expand Down Expand Up @@ -204,6 +227,7 @@ public void notifySuccess(Object result) {
Function<?, ?> func = getFunction((Object[]) result, objectVal, methodName);
scheduler.schedule(new Object[1], func, future);
}

@Override
public void notifyFailure(BError error) {
callback.notifyFailure(error);
Expand Down Expand Up @@ -264,6 +288,7 @@ public void notifySuccess(Object result) {
scheduler.scheduleToObjectGroup(new Object[1], func, future);
}
}

@Override
public void notifyFailure(BError error) {
callback.notifyFailure(error);
Expand Down Expand Up @@ -338,38 +363,62 @@ public void registerStopHandler(BFunctionPointer<?, ?> stopHandler) {
return func;
}

private void invokeConfigInit() {
String configClassName = getConfigClassName(this.module);
Class<?> configClazz;
try {
configClazz = Class.forName(configClassName);
} catch (Throwable e) {
throw ErrorCreator.createError(StringUtils.fromString("failed to load configuration class :" +
configClassName));
}
private void invokeConfigInit() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
Class<?> configClass = loadClass(CONFIGURATION_CLASS_NAME);
ConfigDetails configDetails = LaunchUtils.getConfigurationDetails();
String funcName = Utils.encodeFunctionIdentifier("$configureInit");
try {
final Method method =
configClazz.getDeclaredMethod(funcName, Map.class, String[].class, Path[].class, String.class);
method.invoke(null, new HashMap<>(), new String[]{}, configDetails.paths, configDetails.configContent);
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
throw ErrorCreator.createError(StringUtils.fromString("configurable initialization failed due to " +
RuntimeUtils.formatErrorMessage(e)), e);
}
Method method = configClass.getDeclaredMethod(funcName, Map.class, String[].class, Path[].class, String.class);
method.invoke(null, new HashMap<>(), new String[]{}, configDetails.paths, configDetails.configContent);
}

private void invokeModuleStop() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
Class<?> configClass = loadClass(MODULE_INIT_CLASS_NAME);
Method method = configClass.getDeclaredMethod("$currentModuleStop", RuntimeRegistry.class);
method.invoke(null, scheduler.getRuntimeRegistry());
}

private static String getConfigClassName(Module module) {
String configClassName = CONFIGURATION_CLASS_NAME;
private Class<?> loadClass(String className) throws ClassNotFoundException {
String name = getFullQualifiedClassName(this.module, className);
return Class.forName(name);
}

private static String getFullQualifiedClassName(Module module, String className) {
String orgName = module.getOrg();
String packageName = module.getName();
if (!DOT.equals(packageName)) {
configClassName = encodeNonFunctionIdentifier(packageName) + "." + module.getMajorVersion() + "." +
configClassName;
className = encodeNonFunctionIdentifier(packageName) + "." + module.getMajorVersion() + "." + className;
}
if (!ANON_ORG.equals(orgName)) {
configClassName = encodeNonFunctionIdentifier(orgName) + "." + configClassName;
className = encodeNonFunctionIdentifier(orgName) + "." + className;
}
return className;
}

private void invokeMethodSync(String functionName) {
final CountDownLatch latch = new CountDownLatch(1);
SyncCallback callback = new SyncCallback(latch);
invokeMethod(functionName, callback, PredefinedTypes.TYPE_NULL, functionName, new Object[1]);
try {
latch.await();
} catch (InterruptedException e) {
throw ErrorCreator.createError(e);
}
return configClassName;
if (callback.initError != null) {
throw callback.initError;
}
}

private void invokeMethod(String functionName, Callback callback, Type returnType, String strandName,
Object... args) {
ValueCreator valueCreator = ValueCreator.getValueCreator(ValueCreator.getLookupKey(module.getOrg(),
module.getName(), module.getMajorVersion(), module.isTestPkg()));
Function<?, ?> func = o -> valueCreator.call((Strand) (((Object[]) o)[0]), functionName, args);
FutureValue future = scheduler.createFuture(null, callback, null, returnType, strandName, null);
Object[] argsWithStrand = new Object[args.length + 1];
argsWithStrand[0] = future.strand;
System.arraycopy(args, 0, argsWithStrand, 1, args.length);
scheduler.schedule(argsWithStrand, func, future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public enum ErrorCodes implements DiagnosticCode {
CONFIG_UNUSED_ENV_VARS("config.env.unused.vars", "RUNTIME_0126"),
CONFIG_ENV_VAR_NAME_AMBIGUITY("config.env.variable.name.ambiguity", "RUNTIME_0127"),
NO_MESSAGE_ERROR("no.worker.message.received", "RUNTIME_0128"),
INVALID_METHOD_CALL("invalid.method.call", "RUNTIME_0129"),
INVALID_FUNCTION_INVOCATION("invalid.function.invocation.call", "RUNTIME_0130"),
FUNCTION_ALREADY_CALLED("function.already.called", "RUNTIME_0129"),
INVALID_FUNCTION_INVOCATION_BEFORE_MODULE_INIT("invalid.function.call.before.module.init", "RUNTIME_0130"),
INVALID_TUPLE_MEMBER_SIZE("invalid.tuple.member.size", "RUNTIME_0131");

private final String errorMsgKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://wso2.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.ballerina.runtime.internal.scheduling;

import io.ballerina.runtime.api.async.Callback;
import io.ballerina.runtime.api.values.BError;

import java.util.concurrent.CountDownLatch;

/**
* This class used to handle ballerina function invocation synchronously.
*
* @since 2201.9.1
*/
public class SyncCallback implements Callback {

public CountDownLatch latch;
public BError initError;

public SyncCallback(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void notifySuccess(Object result) {
latch.countDown();
}

@Override
public void notifyFailure(BError error) {
latch.countDown();
initError = error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ regexp.invalid.unicode.general.category.value = invalid Unicode general category
regexp.invalid.unicode.property.value = invalid Unicode property value ''{0}''
regexp.empty.character.class.disallowed = empty character class disallowed
regexp.invalid.hex.digit = invalid hexadecimal digit
invalid.method.call = ''{0}'' method is called before module initialization
invalid.function.invocation.call = function ''{0}'' is called before module initialization
function.already.called = function ''{0}'' has already been called
invalid.function.call.before.module.init = function ''{0}'' is called before module initialization
config.env.vars.ambiguity = configurable value for variable ''{0}'' clashes with multiple environment variables {1}
config.env.variable.ambiguity = configurable value for variable ''{0}'' clashes with variable ''{1}''. Please \
provide the env variable as ''[{2}]''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public class JvmConstants {
public static final String CONSTANT_INIT_METHOD_PREFIX = "$constant_init";
public static final String ANNOTATIONS_METHOD_PREFIX = "$process_annotations";
public static final String CURRENT_MODULE_INIT = "$currentModuleInit";
public static final String CURRENT_MODULE_STOP = "$currentModuleStop";
public static final String MODULE_INIT_METHOD = "$moduleInit";
public static final String MODULE_START_METHOD = "$moduleStart";
public static final String MODULE_STOP_METHOD = "$moduleStop";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ private void generateModuleClasses(BIRPackage module, Map<String, byte[]> jarEnt

generateLockForVariable(cw);
initMethodGen.generateModuleInitializer(cw, module, moduleInitClass, typesClass);
initMethodGen.generateModuleStop(cw, moduleInitClass, asyncDataCollector, jvmConstantsGen);
ModuleStopMethodGen stopMethodGen = new ModuleStopMethodGen(jvmTypeGen, jvmConstantsGen);
stopMethodGen.generateExecutionStopMethod(cw, moduleInitClass, module, asyncDataCollector,
immediateImports);
Expand Down Expand Up @@ -743,8 +744,8 @@ CompiledJarFile generate(BIRPackage module) {
typeHashVisitor, jarEntries, symbolTable);

// generate the shutdown listener class.
new ShutDownListenerGen(jvmConstantsGen).generateShutdownSignalListener(moduleInitClass, jarEntries,
asyncDataCollector);
new ShutDownListenerGen().generateShutdownSignalListener(moduleInitClass, jarEntries
);

removeSourceAnnotationTypeDefs(module.typeDefs);
// desugar the record init function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public class JvmSignatures {
public static final String CREATE_XML_PI = "(L" + B_STRING_VALUE + ";L" + B_STRING_VALUE + ";Z)L" + XML_VALUE + ";";
public static final String CREATE_XML_TEXT = "(L" + B_STRING_VALUE + ";)L" + XML_VALUE + ";";
public static final String CRETAE_XML_SEQUENCE = "()L" + XML_SEQUENCE + ";";
public static final String CURRENT_MODULE_STOP = "(L" + RUNTIME_REGISTRY_CLASS + ";)V";
public static final String DECIMAL_NEGATE = "()L" + DECIMAL_VALUE + ";";
public static final String DECIMAL_TO_HANDLE = "(L" + OBJECT + ";)L" + HANDLE_VALUE + ";";
public static final String DECIMAL_VALUE_OF_BOOLEAN = "(B)L" + DECIMAL_VALUE + ";";
Expand Down
Loading

0 comments on commit 47293c1

Please sign in to comment.