Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak of tri protocol #13973

Merged
merged 13 commits into from
May 14, 2024
Merged

fix memory leak of tri protocol #13973

merged 13 commits into from
May 14, 2024

Conversation

icodening
Copy link
Contributor

What is the purpose of the change

fix memory leak of tri protocol

Brief changelog

add streamclose listener to release netty bytebuf

Verifying this change

Checklist

  • Make sure there is a GitHub_issue field for the change (usually before you start working on it). Trivial changes like typos do not require a GitHub issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Check if is necessary to patch to Dubbo 3 if you are work on Dubbo 2.7
  • Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add sample in dubbo samples project.
  • Add some description to dubbo-website project if you are requesting to add a feature.
  • GitHub Actions works fine on your own branch.
  • If this contribution is large, please follow the Software Donation Guide.

@oxsean
Copy link
Collaborator

oxsean commented Mar 21, 2024

@icodening LGTM

@AlbumenJ
Copy link
Member

image

Is this test failure related with this PR?

@BitoAgent
Copy link

Code Review Agent Run Status

  • AI Based Review: Successful

Code Review Overview

  • Summary: The PR introduces changes aimed at fixing a memory leak in the tri protocol by adding stream close listeners to release Netty byte buffers. The modifications span across multiple files, introducing a new exception class for stream cancellation, enhancing server channel observers with state management to handle stream closure, and integrating these changes into the protocol handling layers. Overall, the changes are well-targeted and seem to align with the goal of improving resource management within the protocol's handling.
  • Code change type: Bug Fix, Feature Addition
  • Unit tests added: False
  • Estimated effort to review (1-5, lower is better): 2, The changes are straightforward and localized to specific components within the system, making the review process less complex.

>>See detailed code suggestions<<
The Bito AI Code Review Agent successfully reviewed 5 files and discovered 19 issues. Please review these issues along with suggested fixes in the Changed Files.

High-level Feedback

General feedback for improvement includes ensuring comprehensive unit tests are added to cover the new code paths introduced, especially around the new exception handling logic and stream closure operations. Additionally, considering the integration of these changes across different components, verifying thread safety and potential race conditions around the state management (e.g., the closed flag in Http2ServerChannelObserver) would be crucial.

Comment on lines +21 to +48
public class CancelStreamException extends RuntimeException implements ErrorCodeHolder {

private final boolean cancelByRemote;

private final long errorCode;

private CancelStreamException(boolean cancelByRemote, long errorCode) {
this.cancelByRemote = cancelByRemote;
this.errorCode = errorCode;
}

public boolean isCancelByRemote() {
return cancelByRemote;
}

public static CancelStreamException fromRemote(long errorCode) {
return new CancelStreamException(true, errorCode);
}

public static CancelStreamException fromLocal(long errorCode) {
return new CancelStreamException(false, errorCode);
}

@Override
public long getErrorCode() {
return errorCode;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Consider making CancelStreamException a checked exception to enforce explicit handling of stream cancellation scenarios, promoting safer code practices.
Code Suggestion:

-public class CancelStreamException extends RuntimeException implements ErrorCodeHolder {
+public class CancelStreamException extends Exception implements ErrorCodeHolder {

Comment on lines +40 to +41
private boolean closed = false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Ensure thread safety for the "closed" flag access and updates, considering possible concurrent modifications.
Code Suggestion:

+import java.util.concurrent.atomic.AtomicBoolean;
-private boolean closed = false;
+private AtomicBoolean closed = new AtomicBoolean(false);

Comment on lines +79 to +83
if (throwable instanceof CancelStreamException) {
if (((CancelStreamException) throwable).isCancelByRemote()) {
closed = true;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Refactor the condition inside the cancel method to reduce nesting and improve readability.
Code Suggestion:

+if (throwable instanceof CancelStreamException && ((CancelStreamException) throwable).isCancelByRemote()) {
-    if (((CancelStreamException) throwable).isCancelByRemote()) {
-        closed = true;
+    closed.set(true);
-    }

Comment on lines +72 to +74
Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel);
ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed());
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Add error handling for the closeFuture listener to gracefully manage potential exceptions during stream closure.
Code Suggestion:

+ctx.channel().closeFuture().addListener(future -> {
+    try {
+        http2TransportListener.onStreamClosed();
+    } catch (Exception e) {
+        // Log or handle the exception
+    }
+});

Comment on lines +191 to +193
public void onStreamClosed() {
// doing on event loop thread
getStreamingDecoder().close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Validate the state of getStreamingDecoder() before invoking close() to avoid potential NullPointerException.
Code Suggestion:

+if (getStreamingDecoder() != null) {
+    getStreamingDecoder().close();
+}

Comment on lines +72 to +74
Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel);
ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed());
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalability Issue: The addition of a closeFuture listener to handle stream closure can potentially create a scalability issue. If there are many instances of Http2TransportListener being created and not properly removed, it could lead to a memory leak as each listener holds a reference to the Http2TransportListener instance. This is particularly concerning in a high-throughput environment where connections are frequently opened and closed.
Fix: Implement a mechanism to ensure that these listeners are removed or deregistered when no longer needed, or when the associated Channel is closed. This could involve keeping track of listeners and explicitly removing them, or using weak references.
Code Suggestion:

ctx.channel().closeFuture().addListener(future -> {
    http2TransportListener.onStreamClosed();
    listeners.remove(http2TransportListener);
});

Comment on lines +79 to +83
if (throwable instanceof CancelStreamException) {
if (((CancelStreamException) throwable).isCancelByRemote()) {
closed = true;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization Issue: The conditional block checks if the throwable is an instance of CancelStreamException and if it is cancelled by remote, then sets the 'closed' flag to true. However, there is no mechanism to ensure that resources associated with the channel are released or cleaned up, potentially leading to resource leaks.
Fix: Ensure that all resources associated with the channel are properly released or cleaned up when the channel is closed to prevent resource leaks.
Code Suggestion:

+ if (closed) {
+    releaseAssociatedResources(); // Implement this method to clean up resources
+ }

Comment on lines 94 to 97
if (closed) {
return;
}
super.onNext(data);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization Issue: The onNext method checks if the channel is closed and returns early if it is. This could lead to missed data processing if the close flag is set prematurely. Additionally, there is no handling for the case where data arrives after the channel is marked as closed but before it is actually closed.
Fix: Implement a more robust mechanism for handling data that arrives after the channel is marked as closed but before it is actually closed. Consider queueing such data for processing or handling it in a way that ensures no data is lost.
Code Suggestion:

+ private final ConcurrentLinkedQueue<Object> pendingData = new ConcurrentLinkedQueue<>();

+ if (closed) {
+    pendingData.offer(data);
+    return;
+ }

+ while (!pendingData.isEmpty()) {
+    Object nextData = pendingData.poll();
+    super.onNext(nextData);
+ }

Comment on lines 102 to 105
if (closed) {
return;
}
super.onError(throwable);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization Issue: Similar to onNext, the onError method returns early if the channel is closed, potentially missing important error handling steps. This could lead to unhandled exceptions or errors that occur after the channel is marked as closed but before it is actually closed.
Fix: Ensure that errors occurring after the channel is marked as closed but before it is actually closed are properly logged or handled to prevent unhandled exceptions.
Code Suggestion:

+ if (closed) {
+    logError(throwable); // Implement this method to log or handle error
+    return;
+ }

Comment on lines +72 to +74
Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel);
ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed());
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization Issue: Registering a listener for every channel close event inside channelRead0 might lead to a memory leak if not properly handled. This is because every time a message is read, a new listener is added to the closeFuture of the channel, potentially creating a large number of listeners for the same event, especially under high load.
Fix: Consider maintaining a single global listener for the close event per channel, or ensure that listeners are properly removed after being triggered to avoid potential memory leaks.
Code Suggestion:

private static final ChannelFutureListener CLOSE_LISTENER = future -> {
    // cleanup resources
};

// During initialization
channel.closeFuture().addListener(CLOSE_LISTENER);

Copy link

sonarcloud bot commented Apr 6, 2024

@codecov-commenter
Copy link

codecov-commenter commented Apr 6, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 39.09%. Comparing base (a12975a) to head (95f9543).
Report is 60 commits behind head on 3.3.

Additional details and impacted files
@@            Coverage Diff             @@
##              3.3   #13973      +/-   ##
==========================================
+ Coverage   38.55%   39.09%   +0.54%     
==========================================
  Files        1895     1730     -165     
  Lines       79272    75284    -3988     
  Branches    11528    11093     -435     
==========================================
- Hits        30560    29433    -1127     
+ Misses      44439    41701    -2738     
+ Partials     4273     4150     -123     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

sonarcloud bot commented May 11, 2024

Copy link
Member

@EarthChen EarthChen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@EarthChen EarthChen merged commit 2c38cab into apache:3.3 May 14, 2024
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants