Skip to content

Commit

Permalink
[test] Add smoke test for Reactor Core feature Hooks.enableAutomaticC…
Browse files Browse the repository at this point in the history
…ontextPropagation() (#2922)
  • Loading branch information
violetagg authored Oct 5, 2023
1 parent 9eea373 commit b6cfd7c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check_reactor_core_3.6_snapshots.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ jobs:
distribution: 'temurin'
java-version: '8'
- name: Build with Gradle
run: ./gradlew clean check --no-daemon -PforceTransport=nio -PreactorCoreVersion='3.6.0-SNAPSHOT'
run: ./gradlew clean check --no-daemon -PforceTransport=nio -PreactorCoreVersion='3.6.0-SNAPSHOT' -PforceContextPropagationVersion='1.0.5'
11 changes: 10 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ ext {
micrometerTracingVersion = '1.0.0' //optional baseline
micrometerDocsVersion = '1.0.0' //optional baseline

contextPropagationVersion = '1.0.0' //optional baseline
contextPropagationDefaultVersion = '1.0.0' //optional baseline
if (!project.hasProperty("forceContextPropagationVersion")) {
contextPropagationVersion = contextPropagationDefaultVersion
}
else {
contextPropagationVersion = forceContextPropagationVersion
println "Context Propagation version defined from command line: ${forceContextPropagationVersion}"
}

braveVersion = '5.16.0'
zipkinSenderVersion = '2.16.3'
Expand Down Expand Up @@ -260,6 +267,8 @@ subprojects {
systemProperty("forceTransport", forceTransport)
}
systemProperty("nettyVersionMicro", VersionNumber.parse(nettyVersion.toString()).micro)
systemProperty("reactorCoreVersionMinor", VersionNumber.parse(reactorCoreVersion.toString()).minor)
systemProperty("contextPropagationVersionMicro", VersionNumber.parse(contextPropagationVersion.toString()).micro)
scanForTestClasses = false
include '**/*Tests.*'
include '**/*Test.*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.Http11SslContextSpec;
Expand All @@ -44,7 +45,9 @@

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.ReactorNetty.getChannelContext;

class ContextPropagationTest {
Expand Down Expand Up @@ -103,6 +106,68 @@ void testContextPropagation(HttpClient client) throws Exception {
}
}

@ParameterizedTest
@MethodSource("httpClientCombinations")
void testAutomaticContextPropagation(HttpClient client) throws Exception {
String reactorCoreVersionMinor = System.getProperty("reactorCoreVersionMinor");
String contextPropagationVersionMicro = System.getProperty("contextPropagationVersionMicro");

boolean enableAutomaticContextPropagation =
reactorCoreVersionMinor != null && !reactorCoreVersionMinor.isEmpty() && Integer.parseInt(reactorCoreVersionMinor) >= 6 && // 3.6.x
contextPropagationVersionMicro != null && !contextPropagationVersionMicro.isEmpty() && Integer.parseInt(contextPropagationVersionMicro) >= 5; // 1.0.5 or above

ssc = new SelfSignedCertificate();
Http2SslContextSpec serverCtxHttp = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
HttpServer server =
HttpServer.create()
.wiretap(true)
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(256))
.handle((in, out) -> out.send(in.receive().retain()));

server = client.configuration().sslProvider() != null ?
server.secure(spec -> spec.sslContext(serverCtxHttp)).protocol(HttpProtocol.HTTP11, HttpProtocol.H2) :
server.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);

disposableServer = server.bindNow();

try {
if (enableAutomaticContextPropagation) {
Hooks.enableAutomaticContextPropagation();
}

registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());

TestThreadLocalHolder.value("First");

AtomicReference<String> threadLocal = new AtomicReference<>();
client.port(disposableServer.port())
.wiretap(true)
.post()
.uri("/")
.send(ByteBufMono.fromString(Mono.just("test")))
.responseContent()
.aggregate()
.asString()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
}
finally {
TestThreadLocalHolder.reset();
registry.removeThreadLocalAccessor(TestThreadLocalAccessor.KEY);
if (enableAutomaticContextPropagation) {
Hooks.disableAutomaticContextPropagation();
}
disposableServer.disposeNow();
}
}

static Object[] httpClientCombinations() {
Http11SslContextSpec clientCtxHttp11 =
Http11SslContextSpec.forClient()
Expand Down

0 comments on commit b6cfd7c

Please sign in to comment.