Skip to content

Commit

Permalink
Merge pull request #43434 from warunalakshitha/java21_jbal_tests
Browse files Browse the repository at this point in the history
[Java 21] Fix runtime get hang due to invalid Strand states
  • Loading branch information
warunalakshitha authored Oct 2, 2024
2 parents 8a5a434 + 6860ffb commit 57eee14
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 110 deletions.
5 changes: 4 additions & 1 deletion bvm/ballerina-rt/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ dependencies {
dist project(path: ':ballerina-lang:query', configuration: 'distributionBirJar')
dist project(path: ':ballerina-lang:transaction', configuration: 'distributionBirJar')
dist project(path: ':ballerina-lang:regexp', configuration: 'distributionBirJar')
// dist project(path: ':metrics-extensions:ballerina-prometheus-extension', configuration: 'distributionBirJar')

// Lang libs
dist project(':ballerina-lang:internal')
Expand Down Expand Up @@ -130,6 +129,10 @@ dependencies {

}

configurations.configureEach {
transitive = false
}

jar {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
dependsOn configurations.dist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ public abstract class Environment {
public abstract Parameter[] getFunctionPathParameters();

/**
* Mark the current executing strand as async. Execution of Ballerina code after the current
* interop will stop until given Ballerina Future is completed. However the java thread will not be blocked
* and will be reused for running other Ballerina code in the meantime. Therefore callee of this method
* must return as soon as possible to avoid starvation of Ballerina code execution.
* Yield the current execution and run some operation so other non isolated functions can run in asynchronously.
*
* @param runnable operation to be executed.
*/
public abstract void markAsync();
public abstract void yieldAndRun(Runnable runnable);

/**
* Gets an instance of Ballerina runtime.
Expand Down Expand Up @@ -105,5 +103,10 @@ public abstract class Environment {
*/
public abstract Object getStrandLocal(String key);

/**
* Gets the current environment repository.
*
* @return repository.
*/
public abstract Repository getRepository();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.ballerina.runtime.api.Repository;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.types.Parameter;
import io.ballerina.runtime.internal.scheduling.Scheduler;
import io.ballerina.runtime.internal.scheduling.Strand;

import java.util.Optional;
Expand Down Expand Up @@ -59,105 +58,57 @@ public BalEnvironment(Strand strand, Module currentModule, String funcName, Para
this.repository = new RepositoryImpl();
}

/**
* Returns the Ballerina function name for the corresponding external interop method.
*
* @return function name
*/
@Override
public String getFunctionName() {
return funcName;
}

/**
* Returns an array consisting of the path parameters of the resource function defined as external.
*
* @return array of {@link Parameter}
*/
@Override
public Parameter[] getFunctionPathParameters() {
return funcPathParams;
}

/**
* Mark the current executing strand as async. Execution of Ballerina code after the current
* interop will stop until given BalFuture is completed. However, the java thread will not be blocked
* and will be reused for running other Ballerina code in the meantime. Therefore, callee of this method
* must return as soon as possible to avoid starvation of ballerina code execution.WD
*/
@Override
public void markAsync() {
Scheduler.getStrand().yield();
public void yieldAndRun(Runnable runnable) {
try {
strand.yield();
runnable.run();
} finally {
strand.resume();
}
}

/**
* Gets an instance of Ballerina runtime.
*
* @return Ballerina runtime instance.
*/
@Override
public BalRuntime getRuntime() {
return strand.scheduler.runtime;
}

/**
* Gets current module {@link Module}.
*
* @return module of the environment.
*/
@Override
public Module getCurrentModule() {
return currentModule;
}

/**
* Gets the strand id. This will be generated on strand initialization.
*
* @return Strand id.
*/
@Override
public int getStrandId() {
return strand.getId();
}

/**
* Gets the strand name. This will be optional. Strand name can be either name given in strand annotation or async
* call or function pointer variable name.
*
* @return Optional strand name.
*/
@Override
public Optional<String> getStrandName() {
return strand.getName();
}

/**
* Gets {@link StrandMetadata}.
*
* @return metadata of the strand.
*/
@Override
public StrandMetadata getStrandMetadata() {
return strand.getMetadata();
}

/**
* Sets given local key value pair in strand.
*
* @param key string key
* @param value value to be store in the strand
*/

@Override
public void setStrandLocal(String key, Object value) {
strand.setProperty(key, value);
}

/**
* Gets the value stored in the strand on given key.
*
* @param key key
* @return value stored in the strand.
*/
@Override
public Object getStrandLocal(String key) {
return strand.getProperty(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static Object handleNonIsolatedStrand(Strand strand, Supplier<Boolean> co
* Used for codegen wait for future.
*/
public static Object handleWait(Strand strand, FutureValue future) {
future.strand.checkStrandCancelled();
if (future.getAndSetWaited()) {
return ErrorUtils.createWaitOnSameFutureError();
}
Expand All @@ -79,6 +80,7 @@ public static Object handleWaitAny(Strand strand, List<FutureValue> futures) {
CompletableFuture<?>[] cFutures = new CompletableFuture[futures.size()];
for (int i = 0; i < futures.size(); i++) {
FutureValue future = futures.get(i);
future.strand.checkStrandCancelled();
if (future.getAndSetWaited()) {
return ErrorUtils.createWaitOnSameFutureError();
}
Expand All @@ -98,6 +100,7 @@ public static void handleWaitMultiple(Strand strand, Map<String, FutureValue> fu
List<String> alreadyWaitedKeys = new ArrayList<>();
for (Map.Entry<String, FutureValue> entry : futureMap.entrySet()) {
FutureValue future = entry.getValue();
future.strand.checkStrandCancelled();
if (!future.getAndSetWaited()) {
cFutures.add(future.completableFuture);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.ballerina.runtime.internal.util.RuntimeUtils;
import io.ballerina.runtime.internal.values.ObjectValue;

import java.io.PrintStream;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -39,8 +38,6 @@ public class RuntimeRegistry {
private final ReentrantLock listenerLock = new ReentrantLock();
private final ReentrantLock stopHandlerLock = new ReentrantLock();

private static final PrintStream outStream = System.err;

public RuntimeRegistry(Scheduler scheduler) {
this.scheduler = scheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,20 @@ public static Strand getStrand() {
return daemonStrand;
}
public Object call(Module module, String functionName, Strand parentStrand, Object... args) {
parentStrand.resume();
ValueCreatorAndFunctionType result = getGetValueCreatorAndFunctionType(module, functionName);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(result.valueCreator(), result.functionType(),
parentStrand, args);
return result.valueCreator().call(parentStrand, functionName, argsWithDefaultValues);
ValueCreatorAndFunctionType functionType = getGetValueCreatorAndFunctionType(module, functionName);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(functionType.valueCreator(),
functionType.functionType(), parentStrand, args);
return functionType.valueCreator().call(parentStrand, functionName, argsWithDefaultValues);
}

public Object call(BObject object, String methodName, Strand parentStrand, Object... args) {
parentStrand.resume();
ObjectType objectType = (ObjectType) TypeUtils.getImpliedType(object.getOriginalType());
MethodType methodType = getObjectMethodType(methodName, objectType);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(objectType, methodType, parentStrand, args);
return ((ObjectValue) object).call(parentStrand, methodName, argsWithDefaultValues);
}

public Object call(FPValue fp, Strand parentStrand, Object... args) {
parentStrand.resume();
FunctionType functionType = (FunctionType) TypeUtils.getImpliedType(TypeUtils.getType(fp));
Object[] argsWithDefaultValues = getArgsWithDefaultValues(parentStrand, args, functionType);
Object[] argsWithStrand = getArgsWithStrand(parentStrand, argsWithDefaultValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,14 @@ public class Strand {
private Map<String, Object> globalProps;

public final boolean isIsolated;
public State state = State.YIELD;
public Scheduler scheduler;
public Strand parent;
public TransactionLocalContext currentTrxContext;
public Stack<TransactionLocalContext> trxContexts;
public WorkerChannelMap workerChannelMap;
public boolean cancel;
public int acquiredLockCount;

public Strand() {
this.id = -1;
this.name = null;
this.metadata = null;
this.isIsolated = false;
}

public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand parent, boolean isIsolated,
Map<String, Object> properties, WorkerChannelMap workerChannelMap) {
this.id = nextStrandId.incrementAndGet();
Expand Down Expand Up @@ -96,20 +89,22 @@ public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand

public void resume() {
checkStrandCancelled();
if (!isIsolated && !scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
scheduler.globalNonIsolatedLock.lock();
if (!this.isIsolated && this.state == State.YIELD) {
this.scheduler.globalNonIsolatedLock.lock();
this.state = State.RUNNABLE;
}
}

public void yield() {
checkStrandCancelled();
if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
if (!this.isIsolated && this.state == State.RUNNABLE) {
scheduler.globalNonIsolatedLock.unlock();
this.state = State.YIELD;
}
}

public void done() {
if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
if (!isIsolated && this.state == State.RUNNABLE) {
scheduler.globalNonIsolatedLock.unlock();
}
}
Expand Down Expand Up @@ -186,8 +181,20 @@ public StrandMetadata getMetadata() {
}

public void checkStrandCancelled() {
if (cancel) {
if (this.state == State.CANCELLED) {
throw ErrorUtils.createCancelledFutureError();
}
}

/**
* Maintains the Strand state.
*
* @since 2201.11.0
*/
public enum State {
RUNNABLE,
YIELD,
CANCELLED
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,31 @@ public class WorkerChannel {
private final AtomicInteger doneCount;
private final CompletableFuture<Object> resultFuture;
private final CompletableFuture<Object> receiveFuture;
private Object result;
private boolean cancel;

public WorkerChannel(String name) {
this.name = name;
this.resultFuture = new CompletableFuture<>();
this.receiveFuture = new CompletableFuture<>();
this.doneCount = new AtomicInteger(2);
this.cancel = false;
}

public Object read() {
if (cancel) {
throw ErrorUtils.createCancelledFutureError();
}
try {
result = AsyncUtils.getFutureResult(resultFuture);
return result;
return AsyncUtils.getFutureResult(resultFuture);
} finally {
receiveFuture.complete(null);
}
}

public Object getResult() {
return result;
}

public void write(Object result) {
if (cancel) {
throw ErrorUtils.createCancelledFutureError();
}
resultFuture.complete(result);
}

Expand Down Expand Up @@ -113,6 +115,10 @@ public boolean done() {
return doneCount.incrementAndGet() == 0;
}

public void cancel() {
this.cancel = true;
}

public String getName() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,13 @@ public void completeReceiveWorkerChannels(String channelKey, Object returnValue)
channelMapLock.writeLock().unlock();
}
}

public void cancel() {
try {
channelMapLock.writeLock().lock();
channelMap.values().forEach(WorkerChannel::cancel);
} finally {
channelMapLock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public BTypedesc getTypedesc() {

@Override
public void cancel() {
this.strand.cancel = true;
this.strand.state = Strand.State.CANCELLED;
if (this.strand.workerChannelMap != null) {
this.strand.workerChannelMap.cancel();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,6 @@ private void genJICallTerm(JIMethodCall callIns, int localVarOffset, BIRNode.BIR
if (callIns.lhsOp != null && callIns.lhsOp.variableDcl != null) {
this.storeToVar(callIns.lhsOp.variableDcl);
}
mv.visitVarInsn(ALOAD, localVarOffset);
mv.visitMethodInsn(INVOKEVIRTUAL, STRAND_CLASS, "resume", VOID_METHOD_DESC, false);
}

private void genJIConstructorTerm(JIConstructorCall callIns) {
Expand Down
Loading

0 comments on commit 57eee14

Please sign in to comment.