diff --git a/bvm/ballerina-rt/build.gradle b/bvm/ballerina-rt/build.gradle index 3a68ec3a1989..785d95ae5591 100644 --- a/bvm/ballerina-rt/build.gradle +++ b/bvm/ballerina-rt/build.gradle @@ -55,7 +55,6 @@ dependencies { dist project(path: ':ballerina-lang:query', configuration: 'distributionBirJar') dist project(path: ':ballerina-lang:transaction', configuration: 'distributionBirJar') dist project(path: ':ballerina-lang:regexp', configuration: 'distributionBirJar') -// dist project(path: ':metrics-extensions:ballerina-prometheus-extension', configuration: 'distributionBirJar') // Lang libs dist project(':ballerina-lang:internal') @@ -130,6 +129,10 @@ dependencies { } +configurations.configureEach { + transitive = false +} + jar { duplicatesStrategy = DuplicatesStrategy.EXCLUDE dependsOn configurations.dist diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/api/Environment.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/api/Environment.java index f583c1bd6980..36abefb03c36 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/api/Environment.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/api/Environment.java @@ -45,13 +45,11 @@ public abstract class Environment { public abstract Parameter[] getFunctionPathParameters(); /** - * Mark the current executing strand as async. Execution of Ballerina code after the current - * interop will stop until given Ballerina Future is completed. However the java thread will not be blocked - * and will be reused for running other Ballerina code in the meantime. Therefore callee of this method - * must return as soon as possible to avoid starvation of Ballerina code execution. + * Yield the current execution and run some operation so other non isolated functions can run in asynchronously. * + * @param runnable operation to be executed. */ - public abstract void markAsync(); + public abstract void yieldAndRun(Runnable runnable); /** * Gets an instance of Ballerina runtime. @@ -105,5 +103,10 @@ public abstract class Environment { */ public abstract Object getStrandLocal(String key); + /** + * Gets the current environment repository. + * + * @return repository. + */ public abstract Repository getRepository(); } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/BalEnvironment.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/BalEnvironment.java index e4de10acc327..d0f54c8d8bd4 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/BalEnvironment.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/BalEnvironment.java @@ -23,7 +23,6 @@ import io.ballerina.runtime.api.Repository; import io.ballerina.runtime.api.async.StrandMetadata; import io.ballerina.runtime.api.types.Parameter; -import io.ballerina.runtime.internal.scheduling.Scheduler; import io.ballerina.runtime.internal.scheduling.Strand; import java.util.Optional; @@ -59,105 +58,57 @@ public BalEnvironment(Strand strand, Module currentModule, String funcName, Para this.repository = new RepositoryImpl(); } - /** - * Returns the Ballerina function name for the corresponding external interop method. - * - * @return function name - */ @Override public String getFunctionName() { return funcName; } - /** - * Returns an array consisting of the path parameters of the resource function defined as external. - * - * @return array of {@link Parameter} - */ @Override public Parameter[] getFunctionPathParameters() { return funcPathParams; } - /** - * Mark the current executing strand as async. Execution of Ballerina code after the current - * interop will stop until given BalFuture is completed. However, the java thread will not be blocked - * and will be reused for running other Ballerina code in the meantime. Therefore, callee of this method - * must return as soon as possible to avoid starvation of ballerina code execution.WD - */ @Override - public void markAsync() { - Scheduler.getStrand().yield(); + public void yieldAndRun(Runnable runnable) { + try { + strand.yield(); + runnable.run(); + } finally { + strand.resume(); + } } - /** - * Gets an instance of Ballerina runtime. - * - * @return Ballerina runtime instance. - */ @Override public BalRuntime getRuntime() { return strand.scheduler.runtime; } - /** - * Gets current module {@link Module}. - * - * @return module of the environment. - */ @Override public Module getCurrentModule() { return currentModule; } - /** - * Gets the strand id. This will be generated on strand initialization. - * - * @return Strand id. - */ @Override public int getStrandId() { return strand.getId(); } - /** - * Gets the strand name. This will be optional. Strand name can be either name given in strand annotation or async - * call or function pointer variable name. - * - * @return Optional strand name. - */ @Override public Optional getStrandName() { return strand.getName(); } - /** - * Gets {@link StrandMetadata}. - * - * @return metadata of the strand. - */ @Override public StrandMetadata getStrandMetadata() { return strand.getMetadata(); } - /** - * Sets given local key value pair in strand. - * - * @param key string key - * @param value value to be store in the strand - */ + @Override public void setStrandLocal(String key, Object value) { strand.setProperty(key, value); } - /** - * Gets the value stored in the strand on given key. - * - * @param key key - * @return value stored in the strand. - */ @Override public Object getStrandLocal(String key) { return strand.getProperty(key); diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/AsyncUtils.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/AsyncUtils.java index 9db0a2c37bda..4fbe3d9e9fad 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/AsyncUtils.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/AsyncUtils.java @@ -58,6 +58,7 @@ public static Object handleNonIsolatedStrand(Strand strand, Supplier co * Used for codegen wait for future. */ public static Object handleWait(Strand strand, FutureValue future) { + future.strand.checkStrandCancelled(); if (future.getAndSetWaited()) { return ErrorUtils.createWaitOnSameFutureError(); } @@ -79,6 +80,7 @@ public static Object handleWaitAny(Strand strand, List futures) { CompletableFuture[] cFutures = new CompletableFuture[futures.size()]; for (int i = 0; i < futures.size(); i++) { FutureValue future = futures.get(i); + future.strand.checkStrandCancelled(); if (future.getAndSetWaited()) { return ErrorUtils.createWaitOnSameFutureError(); } @@ -98,6 +100,7 @@ public static void handleWaitMultiple(Strand strand, Map fu List alreadyWaitedKeys = new ArrayList<>(); for (Map.Entry entry : futureMap.entrySet()) { FutureValue future = entry.getValue(); + future.strand.checkStrandCancelled(); if (!future.getAndSetWaited()) { cFutures.add(future.completableFuture); } else { diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/RuntimeRegistry.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/RuntimeRegistry.java index ee1766f93a0e..71886f58cbaf 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/RuntimeRegistry.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/RuntimeRegistry.java @@ -21,7 +21,6 @@ import io.ballerina.runtime.internal.util.RuntimeUtils; import io.ballerina.runtime.internal.values.ObjectValue; -import java.io.PrintStream; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.locks.ReentrantLock; @@ -39,8 +38,6 @@ public class RuntimeRegistry { private final ReentrantLock listenerLock = new ReentrantLock(); private final ReentrantLock stopHandlerLock = new ReentrantLock(); - private static final PrintStream outStream = System.err; - public RuntimeRegistry(Scheduler scheduler) { this.scheduler = scheduler; } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Scheduler.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Scheduler.java index 70283d30bc60..f58196e71fb7 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Scheduler.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Scheduler.java @@ -75,15 +75,13 @@ public static Strand getStrand() { return daemonStrand; } public Object call(Module module, String functionName, Strand parentStrand, Object... args) { - parentStrand.resume(); - ValueCreatorAndFunctionType result = getGetValueCreatorAndFunctionType(module, functionName); - Object[] argsWithDefaultValues = getArgsWithDefaultValues(result.valueCreator(), result.functionType(), - parentStrand, args); - return result.valueCreator().call(parentStrand, functionName, argsWithDefaultValues); + ValueCreatorAndFunctionType functionType = getGetValueCreatorAndFunctionType(module, functionName); + Object[] argsWithDefaultValues = getArgsWithDefaultValues(functionType.valueCreator(), + functionType.functionType(), parentStrand, args); + return functionType.valueCreator().call(parentStrand, functionName, argsWithDefaultValues); } public Object call(BObject object, String methodName, Strand parentStrand, Object... args) { - parentStrand.resume(); ObjectType objectType = (ObjectType) TypeUtils.getImpliedType(object.getOriginalType()); MethodType methodType = getObjectMethodType(methodName, objectType); Object[] argsWithDefaultValues = getArgsWithDefaultValues(objectType, methodType, parentStrand, args); @@ -91,7 +89,6 @@ public Object call(BObject object, String methodName, Strand parentStrand, Objec } public Object call(FPValue fp, Strand parentStrand, Object... args) { - parentStrand.resume(); FunctionType functionType = (FunctionType) TypeUtils.getImpliedType(TypeUtils.getType(fp)); Object[] argsWithDefaultValues = getArgsWithDefaultValues(parentStrand, args, functionType); Object[] argsWithStrand = getArgsWithStrand(parentStrand, argsWithDefaultValues); diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java index a790eefb1220..8076cb72b015 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java @@ -44,21 +44,14 @@ public class Strand { private Map globalProps; public final boolean isIsolated; + public State state = State.YIELD; public Scheduler scheduler; public Strand parent; public TransactionLocalContext currentTrxContext; public Stack trxContexts; public WorkerChannelMap workerChannelMap; - public boolean cancel; public int acquiredLockCount; - public Strand() { - this.id = -1; - this.name = null; - this.metadata = null; - this.isIsolated = false; - } - public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand parent, boolean isIsolated, Map properties, WorkerChannelMap workerChannelMap) { this.id = nextStrandId.incrementAndGet(); @@ -96,20 +89,22 @@ public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand public void resume() { checkStrandCancelled(); - if (!isIsolated && !scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) { - scheduler.globalNonIsolatedLock.lock(); + if (!this.isIsolated && this.state == State.YIELD) { + this.scheduler.globalNonIsolatedLock.lock(); + this.state = State.RUNNABLE; } } public void yield() { checkStrandCancelled(); - if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) { + if (!this.isIsolated && this.state == State.RUNNABLE) { scheduler.globalNonIsolatedLock.unlock(); + this.state = State.YIELD; } } public void done() { - if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) { + if (!isIsolated && this.state == State.RUNNABLE) { scheduler.globalNonIsolatedLock.unlock(); } } @@ -186,8 +181,20 @@ public StrandMetadata getMetadata() { } public void checkStrandCancelled() { - if (cancel) { + if (this.state == State.CANCELLED) { throw ErrorUtils.createCancelledFutureError(); } } + + /** + * Maintains the Strand state. + * + * @since 2201.11.0 + */ + public enum State { + RUNNABLE, + YIELD, + CANCELLED + } + } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannel.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannel.java index 468f20df8ca2..1ccd66091ea9 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannel.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannel.java @@ -35,29 +35,31 @@ public class WorkerChannel { private final AtomicInteger doneCount; private final CompletableFuture resultFuture; private final CompletableFuture receiveFuture; - private Object result; + private boolean cancel; public WorkerChannel(String name) { this.name = name; this.resultFuture = new CompletableFuture<>(); this.receiveFuture = new CompletableFuture<>(); this.doneCount = new AtomicInteger(2); + this.cancel = false; } public Object read() { + if (cancel) { + throw ErrorUtils.createCancelledFutureError(); + } try { - result = AsyncUtils.getFutureResult(resultFuture); - return result; + return AsyncUtils.getFutureResult(resultFuture); } finally { receiveFuture.complete(null); } } - public Object getResult() { - return result; - } - public void write(Object result) { + if (cancel) { + throw ErrorUtils.createCancelledFutureError(); + } resultFuture.complete(result); } @@ -113,6 +115,10 @@ public boolean done() { return doneCount.incrementAndGet() == 0; } + public void cancel() { + this.cancel = true; + } + public String getName() { return name; } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannelMap.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannelMap.java index 41f9c9bf580c..a59f850c60ad 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannelMap.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/WorkerChannelMap.java @@ -109,4 +109,13 @@ public void completeReceiveWorkerChannels(String channelKey, Object returnValue) channelMapLock.writeLock().unlock(); } } + + public void cancel() { + try { + channelMapLock.writeLock().lock(); + channelMap.values().forEach(WorkerChannel::cancel); + } finally { + channelMapLock.writeLock().unlock(); + } + } } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/values/FutureValue.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/values/FutureValue.java index 5ac1443d87b0..3cfbb54136c0 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/values/FutureValue.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/values/FutureValue.java @@ -102,7 +102,10 @@ public BTypedesc getTypedesc() { @Override public void cancel() { - this.strand.cancel = true; + this.strand.state = Strand.State.CANCELLED; + if (this.strand.workerChannelMap != null) { + this.strand.workerChannelMap.cancel(); + } } /** diff --git a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/JvmTerminatorGen.java b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/JvmTerminatorGen.java index 7e995418d77e..0a09e26116b2 100644 --- a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/JvmTerminatorGen.java +++ b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/JvmTerminatorGen.java @@ -511,8 +511,6 @@ private void genJICallTerm(JIMethodCall callIns, int localVarOffset, BIRNode.BIR if (callIns.lhsOp != null && callIns.lhsOp.variableDcl != null) { this.storeToVar(callIns.lhsOp.variableDcl); } - mv.visitVarInsn(ALOAD, localVarOffset); - mv.visitMethodInsn(INVOKEVIRTUAL, STRAND_CLASS, "resume", VOID_METHOD_DESC, false); } private void genJIConstructorTerm(JIConstructorCall callIns) { diff --git a/langlib/lang.runtime/src/main/java/org/ballerinalang/langlib/runtime/Sleep.java b/langlib/lang.runtime/src/main/java/org/ballerinalang/langlib/runtime/Sleep.java index 18415b7bb27b..a5f74d6f322e 100644 --- a/langlib/lang.runtime/src/main/java/org/ballerinalang/langlib/runtime/Sleep.java +++ b/langlib/lang.runtime/src/main/java/org/ballerinalang/langlib/runtime/Sleep.java @@ -46,12 +46,13 @@ public static void sleep(Environment env, BDecimal delaySeconds) { } else { delay = delayDecimal.longValue(); } - env.markAsync(); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - throw ErrorCreator.createError(StringUtils.fromString("error occurred during sleep"), e); - } + env.yieldAndRun(() -> { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw ErrorCreator.createError(StringUtils.fromString("error occurred during sleep"), e); + } + }); } private Sleep() { diff --git a/misc/debug-adapter/modules/debug-adapter-runtime/src/main/java/org/ballerinalang/debugadapter/runtime/DebuggerRuntime.java b/misc/debug-adapter/modules/debug-adapter-runtime/src/main/java/org/ballerinalang/debugadapter/runtime/DebuggerRuntime.java index 671cb18c9e6f..0f035685d617 100644 --- a/misc/debug-adapter/modules/debug-adapter-runtime/src/main/java/org/ballerinalang/debugadapter/runtime/DebuggerRuntime.java +++ b/misc/debug-adapter/modules/debug-adapter-runtime/src/main/java/org/ballerinalang/debugadapter/runtime/DebuggerRuntime.java @@ -40,6 +40,7 @@ import io.ballerina.runtime.api.values.BXmlSequence; import io.ballerina.runtime.internal.configurable.providers.ConfigDetails; import io.ballerina.runtime.internal.launch.LaunchUtils; +import io.ballerina.runtime.internal.scheduling.Scheduler; import io.ballerina.runtime.internal.scheduling.Strand; import io.ballerina.runtime.internal.types.BAnnotatableType; import io.ballerina.runtime.internal.values.ErrorValue; @@ -105,7 +106,7 @@ public class DebuggerRuntime { public static Object invokeObjectMethod(BObject bObject, String methodName, Object... args) { try { final Object[] paramValues = args[0] instanceof Strand ? Arrays.copyOfRange(args, 1, args.length) : args; - final Strand strand = args[0] instanceof Strand s ? s : new Strand(); + final Strand strand = args[0] instanceof Strand s ? s : Scheduler.getStrand(); return ((ObjectValue) bObject).call(strand, methodName, paramValues); } catch (Exception e) { throw ErrorCreator.createError(StringUtils.fromString("invocation failed: " + e.getMessage())); diff --git a/tests/ballerina-test-utils/src/main/java/org/ballerinalang/test/BRunUtil.java b/tests/ballerina-test-utils/src/main/java/org/ballerinalang/test/BRunUtil.java index b99701c5175f..1a1688d7cec0 100644 --- a/tests/ballerina-test-utils/src/main/java/org/ballerinalang/test/BRunUtil.java +++ b/tests/ballerina-test-utils/src/main/java/org/ballerinalang/test/BRunUtil.java @@ -436,7 +436,7 @@ private static void call(Class initClazz, BLangIdentifier name) { try { final Method method = initClazz.getDeclaredMethod(funcName, Strand.class); try { - method.invoke(null, new Strand()); + method.invoke(null, Scheduler.getStrand()); } catch (InvocationTargetException e) { Throwable targetException = e.getTargetException(); if (targetException instanceof RuntimeException) { diff --git a/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/utils/interop/Utils.java b/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/utils/interop/Utils.java index 50246b9cad22..4096d13146a2 100644 --- a/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/utils/interop/Utils.java +++ b/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/utils/interop/Utils.java @@ -32,12 +32,13 @@ public class Utils { public static void sleep(Environment env, long delayMillis) { - try { - env.markAsync(); - Thread.sleep(delayMillis); - } catch (InterruptedException e) { - throw ErrorCreator.createError(e); - } + env.yieldAndRun(() -> { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException e) { + throw ErrorCreator.createError(e); + } + }); } public static boolean isIsolated() {