Skip to content

Commit

Permalink
core: fix jumpstream listener not set when closing stream inline (#8290)
Browse files Browse the repository at this point in the history
* Revert "core: rollback executor supplier, needs investigation (#8289)"

This reverts commit 1b57d48.

* set jumpStream listener before close stream
  • Loading branch information
YifeiZhuang committed Jun 28, 2021
1 parent 66faf10 commit 7644350
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 43 deletions.
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ForwardingServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public T executor(@Nullable Executor executor) {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T addService(ServerServiceDefinition service) {
delegate().addService(service);
Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/io/grpc/ServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,30 @@ public static ServerBuilder<?> forPort(int port) {
*/
public abstract T executor(@Nullable Executor executor);


/**
* Allows for defining a way to provide a custom executor to handle the server call.
* This executor is the result of calling
* {@link ServerCallExecutorSupplier#getExecutor(ServerCall, Metadata)} per RPC.
*
* <p>It's an optional parameter. If it is provided, the {@link #executor(Executor)} would still
* run necessary tasks before the {@link ServerCallExecutorSupplier} is ready to be called, then
* it switches over.
*
* <p>If it is provided, {@link #directExecutor()} optimization is disabled. But if calling
* {@link ServerCallExecutorSupplier} returns null, the server call is still handled by the
* default {@link #executor(Executor)} as a fallback.
*
* @param executorSupplier the server call executor provider
* @return this
* @since 1.39.0
*
* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
return thisT();
}

/**
* Adds a service implementation to the handler registry.
*
Expand Down
34 changes: 34 additions & 0 deletions api/src/main/java/io/grpc/ServerCallExecutorSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.util.concurrent.Executor;
import javax.annotation.Nullable;

/**
* Defines what executor handles the server call, based on each RPC call information at runtime.
* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public interface ServerCallExecutorSupplier {

/**
* Returns an executor to handle the server call.
* It should never throw. It should return null to fallback to the default executor.
* */
@Nullable
<ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> call, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.HandlerRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
Expand Down Expand Up @@ -67,6 +68,12 @@ public T directExecutor() {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T executor(@Nullable Executor executor) {
delegate().executor(executor);
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/io/grpc/internal/SerializingExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() {
private static final int RUNNING = -1;

/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;
private Executor executor;

/** A list of Runnables to be run in order. */
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
Expand All @@ -76,6 +76,15 @@ public SerializingExecutor(Executor executor) {
this.executor = executor;
}

/**
* Only call this from this SerializingExecutor Runnable, so that the executor is immediately
* visible to this SerializingExecutor executor.
* */
public void setExecutor(Executor executor) {
Preconditions.checkNotNull(executor, "'executor' must not be null.");
this.executor = executor;
}

/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
Expand Down Expand Up @@ -118,7 +127,8 @@ private void schedule(@Nullable Runnable removable) {
public void run() {
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
Executor oldExecutor = executor;
while (oldExecutor == executor && (r = runQueue.poll()) != null ) {
try {
r.run();
} catch (RuntimeException e) {
Expand Down
151 changes: 110 additions & 41 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
Expand All @@ -46,6 +47,7 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
Expand Down Expand Up @@ -125,6 +127,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;
private final ServerCallExecutorSupplier executorSupplier;

/**
* Construct a server.
Expand Down Expand Up @@ -159,6 +162,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");
channelz.addServer(this);
this.executorSupplier = builder.executorSupplier;
}

/**
Expand Down Expand Up @@ -469,11 +473,11 @@ private void streamCreatedInternal(
final Executor wrappedExecutor;
// This is a performance optimization that avoids the synchronization and queuing overhead
// that comes with SerializingExecutor.
if (executor == directExecutor()) {
if (executorSupplier != null || executor != directExecutor()) {
wrappedExecutor = new SerializingExecutor(executor);
} else {
wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
stream.optimizeForDirectExecutor();
} else {
wrappedExecutor = new SerializingExecutor(executor);
}

if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
Expand All @@ -499,52 +503,120 @@ private void streamCreatedInternal(

final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context, tag);
wrappedExecutor, executor, stream, context, tag);
stream.setListener(jumpListener);
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.

final class StreamCreated extends ContextRunnable {
StreamCreated() {
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
// Run in serializing executor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
// queued before any callbacks are queued at serializing executor.
// MethodLookup() runs on the default executor.
// When executorSupplier is enabled, MethodLookup() may set/change the executor in the
// SerializingExecutor before it finishes running.
// Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
// Otherwise, they all run on the default executor.

final class MethodLookup extends ContextRunnable {
MethodLookup() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag);
}
}

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
ServerMethodDefinition<?, ?> wrapMethod;
ServerCallParameters<?, ?> callParams;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
jumpListener.setListener(NOOP_LISTENER);
stream.close(status, new Metadata());
context.cancel(null);
future.cancel(false);
return;
}
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
wrapMethod = wrapMethod(stream, method, statsTraceCtx);
callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
future.set(callParams);
} catch (Throwable t) {
jumpListener.setListener(NOOP_LISTENER);
stream.close(Status.fromThrowable(t), new Metadata());
context.cancel(null);
future.cancel(false);
throw t;
}
}

private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
final ServerMethodDefinition<ReqT, RespT> methodDef,
final ServerStream stream,
final Metadata headers,
final Context.CancellableContext context,
final Tag tag) {
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
if (executorSupplier != null) {
Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
if (switchingExecutor != null) {
((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
}
}
return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
}
}

final class HandleServerCall extends ContextRunnable {
HandleServerCall() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$HandleServerCall.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$HandleServerCall.startCall", tag);
}
}

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
if (future.isCancelled()) {
return;
}
try {
listener = startWrappedCall(methodName, Futures.getDone(future), headers);
} catch (Throwable ex) {
stream.close(Status.fromThrowable(ex), new Metadata());
context.cancel(null);
throw new IllegalStateException(ex);
} finally {
jumpListener.setListener(listener);
}
Expand All @@ -568,7 +640,8 @@ public void cancelled(Context context) {
}
}

wrappedExecutor.execute(new StreamCreated());
wrappedExecutor.execute(new MethodLookup());
wrappedExecutor.execute(new HandleServerCall());
}

private Context.CancellableContext createContext(
Expand All @@ -593,9 +666,8 @@ private Context.CancellableContext createContext(
}

/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<>(
Expand All @@ -609,34 +681,31 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
return wMethodDef;
}

private final class ServerCallParameters<ReqT, RespT> {
ServerCallImpl<ReqT, RespT> call;
ServerCallHandler<ReqT, RespT> callHandler;

public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
ServerCallHandler<ReqT, RespT> callHandler) {
this.call = call;
this.callHandler = callHandler;
}
}

private <WReqT, WRespT> ServerStreamListener startWrappedCall(
String fullMethodName,
ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context,
Tag tag) {

ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);

ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {
ServerCallParameters<WReqT, WRespT> params,
Metadata headers) {
ServerCall.Listener<WReqT> callListener =
params.callHandler.startCall(params.call, headers);
if (callListener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
"startCall() returned a null listener for method " + fullMethodName);
}
return call.newServerStreamListener(listener);
return params.call.newServerStreamListener(callListener);
}
}

Expand Down
Loading

0 comments on commit 7644350

Please sign in to comment.