Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix netty memory leak #14127

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,13 @@ protected final HttpOutputMessage buildMessage(Object data) throws Throwable {
data = ((HttpResult<?>) data).getBody();
}
HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
preOutputMessage(outputMessage);
responseEncoder.encode(outputMessage.getBody(), data);
try {
preOutputMessage(outputMessage);
responseEncoder.encode(outputMessage.getBody(), data);
} catch (Throwable t) {
outputMessage.close();
throw t;
}
return outputMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
*/
package org.apache.dubbo.remoting.http12;

import java.io.IOException;
import java.io.InputStream;

public interface HttpInputMessage {
public interface HttpInputMessage extends AutoCloseable {

InputStream getBody();

@Override
default void close() throws IOException {
getBody().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.dubbo.remoting.http12;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

public interface HttpOutputMessage {
public interface HttpOutputMessage extends AutoCloseable {

HttpOutputMessage EMPTY_MESSAGE = new HttpOutputMessage() {

Expand All @@ -32,4 +33,9 @@ public OutputStream getBody() {
};

OutputStream getBody();

@Override
default void close() throws IOException {
getBody().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.RequestMetadata;

import java.io.IOException;
import java.io.InputStream;

public class DefaultHttp1Request implements Http1Request {
Expand Down Expand Up @@ -52,4 +53,9 @@ public String method() {
public String path() {
return httpMetadata.path();
}

@Override
public void close() throws IOException {
httpInputMessage.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.HttpMetadata;

import java.io.IOException;
import java.io.InputStream;

public class DefaultHttp1Response implements HttpMetadata, HttpInputMessage {
Expand All @@ -42,4 +43,9 @@ public InputStream getBody() {
public HttpHeaders headers() {
return httpMetadata.headers();
}

@Override
public void close() throws IOException {
httpInputMessage.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import org.apache.dubbo.remoting.http12.HttpOutputMessage;

import java.io.IOException;
import java.io.OutputStream;

import io.netty.buffer.ByteBufOutputStream;

public class Http1OutputMessage implements HttpOutputMessage {

private final OutputStream outputStream;
Expand All @@ -32,4 +35,12 @@ public Http1OutputMessage(OutputStream outputStream) {
public OutputStream getBody() {
return outputStream;
}

@Override
public void close() throws IOException {
if (outputStream instanceof ByteBufOutputStream) {
((ByteBufOutputStream) outputStream).buffer().release();
}
outputStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.dubbo.remoting.http12.h2;

import java.io.IOException;
import java.io.OutputStream;

import io.netty.buffer.ByteBufOutputStream;

public class Http2OutputMessageFrame implements Http2OutputMessage {

private final OutputStream body;
Expand All @@ -42,6 +45,14 @@ public OutputStream getBody() {
return body;
}

@Override
public void close() throws IOException {
if (body instanceof ByteBufOutputStream) {
((ByteBufOutputStream) body).buffer().release();
}
body.close();
}

@Override
public boolean isEndStream() {
return endStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public void onData(MESSAGE message) {
doOnData(message);
} catch (Throwable t) {
logError(t);
onError(t);
onError(message, t);
} finally {
onFinally(message);
}
});
}
Expand Down Expand Up @@ -184,6 +186,18 @@ protected void onError(Throwable throwable) {
throw new HttpStatusException(HttpStatus.INTERNAL_SERVER_ERROR.getCode(), throwable);
}

protected void onError(MESSAGE message, Throwable throwable) {
onError(throwable);
}

protected void onFinally(MESSAGE message) {
try {
message.close();
} catch (Exception e) {
onError(e);
}
}

protected RpcInvocation buildRpcInvocation(RpcInvocationBuildContext context) {
MethodDescriptor methodDescriptor = context.getMethodDescriptor();
if (methodDescriptor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
Expand Down Expand Up @@ -122,6 +123,19 @@ protected RpcInvocation onBuildRpcInvocationCompletion(RpcInvocation invocation)
return invocation;
}

@Override
protected void onError(Http2InputMessage message, Throwable throwable) {
try {
message.close();
} catch (Exception e) {
throwable.addSuppressed(e);
}
onError(throwable);
}

@Override
protected void onFinally(Http2InputMessage message) {}

@Override
protected GrpcStreamingDecoder getStreamingDecoder() {
return (GrpcStreamingDecoder) super.getStreamingDecoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ protected Executor initializeExecutor(Http2Header metadata) {
return new SerializingExecutor(executorSupport.getExecutor(metadata));
}

@Override
protected void doOnMetadata(Http2Header metadata) {
if (metadata.isEndStream()) {
if (!HttpMethods.supportBody(metadata.method())) {
Expand Down Expand Up @@ -164,7 +165,7 @@ protected void onMetadataCompletion(Http2Header metadata) {
@Override
protected void onDataCompletion(Http2InputMessage message) {
if (message.isEndStream()) {
serverCallListener.onComplete();
finefuture marked this conversation as resolved.
Show resolved Hide resolved
getStreamingDecoder().close();
}
}

Expand Down
Loading