Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix triple client connection shareing race condition #14718

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.model.StubServiceDescriptor;
import org.apache.dubbo.rpc.service.Destroyable;
import org.apache.dubbo.rpc.stub.BiStreamMethodHandler;
import org.apache.dubbo.rpc.stub.ServerStreamMethodHandler;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
Expand Down Expand Up @@ -130,13 +131,18 @@ public final class {{className}} {
{{/biStreamingWithoutClientStreamMethods}}
}

public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}{
public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}, Destroyable {
private final Invoker<{{interfaceClassName}}> invoker;

public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> invoker) {
this.invoker = invoker;
}

@Override
public void $destroy() {
invoker.destroy();
}

{{#unaryMethods}}
{{#javaDoc}}
{{{javaDoc}}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ public final void increase() {
/**
* Increments the reference count by 1.
*/
public final AbstractConnectionClient retain() {
public final boolean retain() {
long oldCount = COUNTER_UPDATER.getAndIncrement(this);
if (oldCount <= 0) {
COUNTER_UPDATER.getAndDecrement(this);
throw new AssertionError("This instance has been destroyed");
logger.info(
"Retain failed, because connection " + remote
+ " has been destroyed but not yet removed, will create a new one instead."
+ " Check logs below to confirm that this connection finally gets removed to make sure there's no potential memory leak!");
return false;
}
return this;
return true;
}

/**
Expand All @@ -77,6 +81,7 @@ public boolean release() {
long remainingCount = COUNTER_UPDATER.decrementAndGet(this);

if (remainingCount == 0) {
logger.info("Destroying connection to {}, because the reference count reaches 0", remote);
destroy();
return true;
} else if (remainingCount <= -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.dubbo.remoting.api.connection;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.model.FrameworkModel;
Expand All @@ -26,6 +28,9 @@
import java.util.function.Consumer;

public class SingleProtocolConnectionManager implements ConnectionManager {
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(SingleProtocolConnectionManager.class);

public static final String NAME = "single";

private final ConcurrentMap<String, AbstractConnectionClient> connections = new ConcurrentHashMap<>(16);
Expand All @@ -42,21 +47,34 @@ public AbstractConnectionClient connect(URL url, ChannelHandler handler) {
throw new IllegalArgumentException("url == null");
}
return connections.compute(url.getAddress(), (address, conn) -> {
String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4");
if (conn == null) {
String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4");
ConnectionManager manager = frameworkModel
.getExtensionLoader(ConnectionManager.class)
.getExtension(transport);
final AbstractConnectionClient connectionClient = manager.connect(url, handler);
connectionClient.addCloseListener(() -> connections.remove(address, connectionClient));
return connectionClient;
return createAbstractConnectionClient(url, handler, address, transport);
} else {
conn.retain();
boolean shouldReuse = conn.retain();
if (!shouldReuse) {
logger.info("Trying to create a new connection for {}.", address);
return createAbstractConnectionClient(url, handler, address, transport);
}
return conn;
}
});
}

private AbstractConnectionClient createAbstractConnectionClient(
URL url, ChannelHandler handler, String address, String transport) {
ConnectionManager manager =
frameworkModel.getExtensionLoader(ConnectionManager.class).getExtension(transport);
final AbstractConnectionClient connectionClient = manager.connect(url, handler);
connectionClient.addCloseListener(() -> {
logger.info(
"Remove closed connection (with reference count==0) for address {}, a new one will be created for upcoming RPC requests routing to this address.",
address);
connections.remove(address, connectionClient);
});
return connectionClient;
}

@Override
public void forEachConnection(Consumer<AbstractConnectionClient> connectionConsumer) {
connections.values().forEach(connectionConsumer);
Expand Down
Loading