Skip to content

Commit

Permalink
[test] More tests for Reactor Core feature Hooks.enableAutomaticConte…
Browse files Browse the repository at this point in the history
…xtPropagation() (#2926)
  • Loading branch information
violetagg authored Oct 5, 2023
1 parent ac7f7ba commit bb8a50a
Showing 1 changed file with 70 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,17 @@ void testAutomaticContextPropagation(HttpClient client) {

TestThreadLocalHolder.value("First");

AtomicReference<String> threadLocal = new AtomicReference<>();
client.port(disposableServer.port())
.wiretap(true)
.post()
.uri("/")
.send(ByteBufMono.fromString(Mono.just("test")))
.responseContent()
.aggregate()
.asString()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
HttpClient.ResponseReceiver<?> responseReceiver =
client.port(disposableServer.port())
.wiretap(true)
.post()
.uri("/")
.send(ByteBufMono.fromString(Mono.just("test")));

response(responseReceiver, enableAutomaticContextPropagation);
responseConnection(responseReceiver, enableAutomaticContextPropagation);
responseContent(responseReceiver, enableAutomaticContextPropagation);
responseSingle(responseReceiver, enableAutomaticContextPropagation);
}
finally {
TestThreadLocalHolder.reset();
Expand Down Expand Up @@ -213,6 +206,65 @@ static Object[] httpClientCombinations() {
};
}

static void response(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.response()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
}

static void responseConnection(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseConnection((res, conn) -> conn.inbound().receive().aggregate().asString())
.next()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
}

static void responseContent(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseContent()
.aggregate()
.asString()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
}

static void responseSingle(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseSingle((res, bytes) -> bytes.asString())
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
}

static void sendRequest(HttpClient client, String expectation) {
client.post()
.uri("/")
Expand Down

0 comments on commit bb8a50a

Please sign in to comment.