Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java 21] Fix runtime get hang due to invalid Strand states #43434

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bvm/ballerina-rt/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ dependencies {
dist project(path: ':ballerina-lang:query', configuration: 'distributionBirJar')
dist project(path: ':ballerina-lang:transaction', configuration: 'distributionBirJar')
dist project(path: ':ballerina-lang:regexp', configuration: 'distributionBirJar')
// dist project(path: ':metrics-extensions:ballerina-prometheus-extension', configuration: 'distributionBirJar')

// Lang libs
dist project(':ballerina-lang:internal')
Expand Down Expand Up @@ -130,6 +129,10 @@ dependencies {

}

configurations.configureEach {
transitive = false
}

jar {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
dependsOn configurations.dist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ public abstract class Environment {
public abstract Parameter[] getFunctionPathParameters();

/**
* Mark the current executing strand as async. Execution of Ballerina code after the current
* interop will stop until given Ballerina Future is completed. However the java thread will not be blocked
* and will be reused for running other Ballerina code in the meantime. Therefore callee of this method
* must return as soon as possible to avoid starvation of Ballerina code execution.
* Yield the current execution and run some operation so other non isolated functions can run in asynchronously.
*
* @param runnable operation to be executed.
*/
public abstract void markAsync();
public abstract void yieldAndRun(Runnable runnable);

/**
* Gets an instance of Ballerina runtime.
Expand Down Expand Up @@ -105,5 +103,10 @@ public abstract class Environment {
*/
public abstract Object getStrandLocal(String key);

/**
* Gets the current environment repository.
*
* @return repository.
*/
public abstract Repository getRepository();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.ballerina.runtime.api.Repository;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.types.Parameter;
import io.ballerina.runtime.internal.scheduling.Scheduler;
import io.ballerina.runtime.internal.scheduling.Strand;

import java.util.Optional;
Expand Down Expand Up @@ -59,105 +58,57 @@ public BalEnvironment(Strand strand, Module currentModule, String funcName, Para
this.repository = new RepositoryImpl();
}

/**
* Returns the Ballerina function name for the corresponding external interop method.
*
* @return function name
*/
@Override
public String getFunctionName() {
return funcName;
}

/**
* Returns an array consisting of the path parameters of the resource function defined as external.
*
* @return array of {@link Parameter}
*/
@Override
public Parameter[] getFunctionPathParameters() {
return funcPathParams;
}

/**
* Mark the current executing strand as async. Execution of Ballerina code after the current
* interop will stop until given BalFuture is completed. However, the java thread will not be blocked
* and will be reused for running other Ballerina code in the meantime. Therefore, callee of this method
* must return as soon as possible to avoid starvation of ballerina code execution.WD
*/
@Override
public void markAsync() {
Scheduler.getStrand().yield();
public void yieldAndRun(Runnable runnable) {
try {
strand.yield();
runnable.run();
} finally {
strand.resume();
}
}

/**
* Gets an instance of Ballerina runtime.
*
* @return Ballerina runtime instance.
*/
@Override
public BalRuntime getRuntime() {
return strand.scheduler.runtime;
}

/**
* Gets current module {@link Module}.
*
* @return module of the environment.
*/
@Override
public Module getCurrentModule() {
return currentModule;
}

/**
* Gets the strand id. This will be generated on strand initialization.
*
* @return Strand id.
*/
@Override
public int getStrandId() {
return strand.getId();
}

/**
* Gets the strand name. This will be optional. Strand name can be either name given in strand annotation or async
* call or function pointer variable name.
*
* @return Optional strand name.
*/
@Override
public Optional<String> getStrandName() {
return strand.getName();
}

/**
* Gets {@link StrandMetadata}.
*
* @return metadata of the strand.
*/
@Override
public StrandMetadata getStrandMetadata() {
return strand.getMetadata();
}

/**
* Sets given local key value pair in strand.
*
* @param key string key
* @param value value to be store in the strand
*/

@Override
public void setStrandLocal(String key, Object value) {
strand.setProperty(key, value);
}

/**
* Gets the value stored in the strand on given key.
*
* @param key key
* @return value stored in the strand.
*/
@Override
public Object getStrandLocal(String key) {
return strand.getProperty(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static Object handleNonIsolatedStrand(Strand strand, Supplier<Boolean> co
* Used for codegen wait for future.
*/
public static Object handleWait(Strand strand, FutureValue future) {
future.strand.checkStrandCancelled();
if (future.getAndSetWaited()) {
return ErrorUtils.createWaitOnSameFutureError();
}
Expand All @@ -79,6 +80,7 @@ public static Object handleWaitAny(Strand strand, List<FutureValue> futures) {
CompletableFuture<?>[] cFutures = new CompletableFuture[futures.size()];
for (int i = 0; i < futures.size(); i++) {
FutureValue future = futures.get(i);
future.strand.checkStrandCancelled();
if (future.getAndSetWaited()) {
return ErrorUtils.createWaitOnSameFutureError();
}
Expand All @@ -98,6 +100,7 @@ public static void handleWaitMultiple(Strand strand, Map<String, FutureValue> fu
List<String> alreadyWaitedKeys = new ArrayList<>();
for (Map.Entry<String, FutureValue> entry : futureMap.entrySet()) {
FutureValue future = entry.getValue();
future.strand.checkStrandCancelled();
if (!future.getAndSetWaited()) {
cFutures.add(future.completableFuture);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.ballerina.runtime.internal.util.RuntimeUtils;
import io.ballerina.runtime.internal.values.ObjectValue;

import java.io.PrintStream;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -39,8 +38,6 @@ public class RuntimeRegistry {
private final ReentrantLock listenerLock = new ReentrantLock();
private final ReentrantLock stopHandlerLock = new ReentrantLock();

private static final PrintStream outStream = System.err;

public RuntimeRegistry(Scheduler scheduler) {
this.scheduler = scheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,20 @@ public static Strand getStrand() {
return daemonStrand;
}
public Object call(Module module, String functionName, Strand parentStrand, Object... args) {
parentStrand.resume();
ValueCreatorAndFunctionType result = getGetValueCreatorAndFunctionType(module, functionName);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(result.valueCreator(), result.functionType(),
parentStrand, args);
return result.valueCreator().call(parentStrand, functionName, argsWithDefaultValues);
ValueCreatorAndFunctionType functionType = getGetValueCreatorAndFunctionType(module, functionName);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(functionType.valueCreator(),
functionType.functionType(), parentStrand, args);
return functionType.valueCreator().call(parentStrand, functionName, argsWithDefaultValues);
}

public Object call(BObject object, String methodName, Strand parentStrand, Object... args) {
parentStrand.resume();
ObjectType objectType = (ObjectType) TypeUtils.getImpliedType(object.getOriginalType());
MethodType methodType = getObjectMethodType(methodName, objectType);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(objectType, methodType, parentStrand, args);
return ((ObjectValue) object).call(parentStrand, methodName, argsWithDefaultValues);
}

public Object call(FPValue fp, Strand parentStrand, Object... args) {
parentStrand.resume();
FunctionType functionType = (FunctionType) TypeUtils.getImpliedType(TypeUtils.getType(fp));
Object[] argsWithDefaultValues = getArgsWithDefaultValues(parentStrand, args, functionType);
Object[] argsWithStrand = getArgsWithStrand(parentStrand, argsWithDefaultValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,14 @@ public class Strand {
private Map<String, Object> globalProps;

public final boolean isIsolated;
public State state = State.YIELD;
public Scheduler scheduler;
public Strand parent;
public TransactionLocalContext currentTrxContext;
public Stack<TransactionLocalContext> trxContexts;
public WorkerChannelMap workerChannelMap;
public boolean cancel;
public int acquiredLockCount;

public Strand() {
this.id = -1;
this.name = null;
this.metadata = null;
this.isIsolated = false;
}

public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand parent, boolean isIsolated,
Map<String, Object> properties, WorkerChannelMap workerChannelMap) {
this.id = nextStrandId.incrementAndGet();
Expand Down Expand Up @@ -96,20 +89,22 @@ public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand

public void resume() {
checkStrandCancelled();
if (!isIsolated && !scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
scheduler.globalNonIsolatedLock.lock();
if (!this.isIsolated && this.state == State.YIELD) {
this.scheduler.globalNonIsolatedLock.lock();
this.state = State.RUNNABLE;
}
}

public void yield() {
checkStrandCancelled();
if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
if (!this.isIsolated && this.state == State.RUNNABLE) {
scheduler.globalNonIsolatedLock.unlock();
this.state = State.YIELD;
}
}

public void done() {
if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
if (!isIsolated && this.state == State.RUNNABLE) {
scheduler.globalNonIsolatedLock.unlock();
}
}
Expand Down Expand Up @@ -186,8 +181,20 @@ public StrandMetadata getMetadata() {
}

public void checkStrandCancelled() {
if (cancel) {
if (this.state == State.CANCELLED) {
throw ErrorUtils.createCancelledFutureError();
}
}

/**
* Maintains the Strand state.
*
* @since 2201.11.0
*/
public enum State {
RUNNABLE,
YIELD,
CANCELLED
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,31 @@ public class WorkerChannel {
private final AtomicInteger doneCount;
private final CompletableFuture<Object> resultFuture;
private final CompletableFuture<Object> receiveFuture;
private Object result;
private boolean cancel;

public WorkerChannel(String name) {
this.name = name;
this.resultFuture = new CompletableFuture<>();
this.receiveFuture = new CompletableFuture<>();
this.doneCount = new AtomicInteger(2);
this.cancel = false;
}

public Object read() {
if (cancel) {
throw ErrorUtils.createCancelledFutureError();
}
try {
result = AsyncUtils.getFutureResult(resultFuture);
return result;
return AsyncUtils.getFutureResult(resultFuture);
} finally {
receiveFuture.complete(null);
}
}

public Object getResult() {
return result;
}

public void write(Object result) {
if (cancel) {
throw ErrorUtils.createCancelledFutureError();
}
resultFuture.complete(result);
}

Expand Down Expand Up @@ -113,6 +115,10 @@ public boolean done() {
return doneCount.incrementAndGet() == 0;
}

public void cancel() {
this.cancel = true;
}

public String getName() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,13 @@ public void completeReceiveWorkerChannels(String channelKey, Object returnValue)
channelMapLock.writeLock().unlock();
}
}

public void cancel() {
try {
channelMapLock.writeLock().lock();
channelMap.values().forEach(WorkerChannel::cancel);
} finally {
channelMapLock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public BTypedesc getTypedesc() {

@Override
public void cancel() {
this.strand.cancel = true;
this.strand.state = Strand.State.CANCELLED;
if (this.strand.workerChannelMap != null) {
this.strand.workerChannelMap.cancel();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,6 @@ private void genJICallTerm(JIMethodCall callIns, int localVarOffset, BIRNode.BIR
if (callIns.lhsOp != null && callIns.lhsOp.variableDcl != null) {
this.storeToVar(callIns.lhsOp.variableDcl);
}
mv.visitVarInsn(ALOAD, localVarOffset);
mv.visitMethodInsn(INVOKEVIRTUAL, STRAND_CLASS, "resume", VOID_METHOD_DESC, false);
}

private void genJIConstructorTerm(JIConstructorCall callIns) {
Expand Down
Loading
Loading