Skip to content

Commit

Permalink
Support StatusException in CoroutineContextServerInterceptor.
Browse files Browse the repository at this point in the history
Previously, there was no way for a subclass of CoroutineContextServerInterceptor to close a call and prevent further processing. The interceptor would unconditionally invoke the next handler in the chain unless prevented by an exception. If the coroutineContext method threw an exception, it would bubble up to the thread pool and show up in the logs as an error.

This change adds a handler for StatusException in CoroutineContextServerInterceptor to catch a StatusException and close the call with no further action.

Fixes #221
  • Loading branch information
Bradford Hovinen committed Apr 19, 2021
1 parent ae2d5f7 commit e61336a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.StatusException
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import io.grpc.Context as GrpcContext
Expand Down Expand Up @@ -33,7 +34,11 @@ abstract class CoroutineContextServerInterceptor : ServerInterceptor {
* server object.
*
* This function will be called each time a [call] is executed.
*
* @throws StatusException if the call should be closed with the [Status][io.grpc.Status] in the
* exception and further processing suppressed
*/
@Throws(StatusException::class)
abstract fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext

private inline fun <R> withGrpcContext(context: GrpcContext, action: () -> R): R {
Expand All @@ -49,8 +54,15 @@ abstract class CoroutineContextServerInterceptor : ServerInterceptor {
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> =
withGrpcContext(GrpcContext.current().extendCoroutineContext(coroutineContext(call, headers))) {
): ServerCall.Listener<ReqT> {
val coroutineContext = try {
coroutineContext(call, headers)
} catch (e: StatusException) {
call.close(e.status, Metadata())
return object: ServerCall.Listener<ReqT>() {}
}
return withGrpcContext(GrpcContext.current().extendCoroutineContext(coroutineContext)) {
next.startCall(call, headers)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package io.grpc.kotlin
import com.google.common.truth.Truth.assertThat
import io.grpc.ServerCall
import io.grpc.ServerInterceptors
import io.grpc.Status
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import io.grpc.examples.helloworld.GreeterGrpcKt.GreeterCoroutineImplBase
import io.grpc.examples.helloworld.GreeterGrpcKt.GreeterCoroutineStub
import io.grpc.examples.helloworld.HelloReply
Expand Down Expand Up @@ -97,4 +100,21 @@ class CoroutineContextServerInterceptorTest : AbstractCallsTest() {
assertThat(client.sayHello(helloRequest("")).message).isEqualTo("interceptor")
}
}
}

@Test
fun `StatusException thrown from coroutineContext closes call`() {
val interceptor = object : CoroutineContextServerInterceptor() {
override fun coroutineContext(
call: ServerCall<*, *>,
headers: GrpcMetadata
): CoroutineContext = throw StatusException(Status.INTERNAL.withDescription("An error"))
}

val channel = makeChannel(HelloReplyWithContextMessage("server"), interceptor)
val client = GreeterCoroutineStub(channel)

runBlocking {
assertThrows<StatusException> { client.sayHello(helloRequest("")) }
}
}
}

0 comments on commit e61336a

Please sign in to comment.