From 172dd82f8cd8680e9c372ed9755fc8693131275a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 30 Sep 2024 22:10:10 +0300 Subject: [PATCH] Ensure the buffer is not released twice (#3448) When NettyOutbound#sendObject(java.lang.Object) is used, it is Netty's responsibility to release the buffer on success/error Fixes #3406 --- .../main/java/reactor/netty/ReactorNetty.java | 28 ++++----- .../reactor/netty/tcp/TcpServerTests.java | 57 ++++++++++++++++++- 2 files changed, 71 insertions(+), 14 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java b/reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java index 87659e4804..c88d92c565 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java +++ b/reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -721,18 +722,21 @@ public void onStateChange(Connection connection, State newState) { * An appending write that delegates to its origin context and append the passed * publisher after the origin success if any. */ - static final class OutboundThen implements NettyOutbound { + static final class OutboundThen extends AtomicBoolean implements NettyOutbound { final NettyOutbound source; final Mono thenMono; static final Runnable EMPTY_CLEANUP = () -> {}; - OutboundThen(NettyOutbound source, Publisher thenPublisher) { this(source, thenPublisher, EMPTY_CLEANUP); } + // This construction is used only with ChannelOperations#sendObject + // The implementation relies on Netty's promise that Channel#writeAndFlush will release the buffer on success/error + // The onCleanup callback is invoked only in case when we are sure that the processing doesn't delegate to Netty + // because of some failure before the exchange can be continued in the thenPublisher OutboundThen(NettyOutbound source, Publisher thenPublisher, Runnable onCleanup) { this.source = source; Objects.requireNonNull(onCleanup, "onCleanup"); @@ -740,23 +744,21 @@ static final class OutboundThen implements NettyOutbound { Mono parentMono = source.then(); if (parentMono == Mono.empty()) { - if (onCleanup == EMPTY_CLEANUP) { - this.thenMono = Mono.from(thenPublisher); - } - else { - this.thenMono = Mono.from(thenPublisher) - .doOnCancel(onCleanup) - .doOnError(t -> onCleanup.run()); - } + this.thenMono = Mono.from(thenPublisher); } else { if (onCleanup == EMPTY_CLEANUP) { this.thenMono = parentMono.thenEmpty(thenPublisher); } else { - this.thenMono = parentMono.thenEmpty(thenPublisher) - .doOnCancel(onCleanup) - .doOnError(t -> onCleanup.run()); + this.thenMono = parentMono + .doFinally(signalType -> { + if ((signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR) && + compareAndSet(false, true)) { + onCleanup.run(); + } + }) + .thenEmpty(thenPublisher); } } } diff --git a/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java b/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java index 17d0b39de3..183848ded6 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java +++ b/reactor-netty-core/src/test/java/reactor/netty/tcp/TcpServerTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.json.JsonObjectDecoder; import io.netty.handler.ssl.SniCompletionEvent; import io.netty.handler.ssl.SslContext; @@ -67,6 +68,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Publisher; import reactor.core.Exceptions; import reactor.core.publisher.Flux; @@ -84,6 +87,7 @@ import reactor.netty.SocketUtils; import reactor.netty.channel.ChannelOperations; import reactor.netty.resources.LoopResources; +import reactor.test.StepVerifier; import reactor.util.Logger; import reactor.util.Loggers; @@ -1233,4 +1237,55 @@ void testTcpServerCancelled() throws InterruptedException { assertThat(serverMessages.size()).isEqualTo(0); } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testIssue3406(boolean singleInvocation) { + DisposableServer server = null; + Connection client = null; + + try { + server = + TcpServer.create() + .wiretap(true) + .handle((in, out) -> out.sendString(Mono.just("testIssue3406")) + .then(in.receive().then())) + .bindNow(); + + Sinks.One result = Sinks.one(); + client = + TcpClient.create() + .remoteAddress(server::address) + .wiretap(true) + .doOnConnected(conn -> + conn.addHandlerFirst( + new MessageToMessageEncoder() { + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, List out) { + // This is no-op in order to force Netty to release the 'msg' + // and to throw Exception + } + })) + .handle((in, out) -> + in.receive() + .retain() + .doOnError(result::tryEmitError) + .doOnComplete(result::tryEmitEmpty) + .flatMap(b -> singleInvocation ? out.sendObject(b) : out.sendObject(b.retain()).sendObject(b))) + .connectNow(); + + result.asMono() + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + finally { + if (client != null) { + client.disposeNow(); + } + if (server != null) { + server.disposeNow(); + } + } + } }