diff --git a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache index 525eb8d634b..6f19d17fb1c 100644 --- a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache +++ b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache @@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.model.ServiceDescriptor; import org.apache.dubbo.rpc.model.StubMethodDescriptor; import org.apache.dubbo.rpc.model.StubServiceDescriptor; +import org.apache.dubbo.rpc.service.Destroyable; import org.apache.dubbo.rpc.stub.BiStreamMethodHandler; import org.apache.dubbo.rpc.stub.ServerStreamMethodHandler; import org.apache.dubbo.rpc.stub.StubInvocationUtil; @@ -130,13 +131,18 @@ public final class {{className}} { {{/biStreamingWithoutClientStreamMethods}} } - public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}{ + public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}, Destroyable { private final Invoker<{{interfaceClassName}}> invoker; public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> invoker) { this.invoker = invoker; } + @Override + public void $destroy() { + invoker.destroy(); + } + {{#unaryMethods}} {{#javaDoc}} {{{javaDoc}}} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java index 80aa710a28f..2fea60ce9d7 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java @@ -61,13 +61,17 @@ public final void increase() { /** * Increments the reference count by 1. */ - public final AbstractConnectionClient retain() { + public final boolean retain() { long oldCount = COUNTER_UPDATER.getAndIncrement(this); if (oldCount <= 0) { COUNTER_UPDATER.getAndDecrement(this); - throw new AssertionError("This instance has been destroyed"); + logger.info( + "Retain failed, because connection " + remote + + " has been destroyed but not yet removed, will create a new one instead." + + " Check logs below to confirm that this connection finally gets removed to make sure there's no potential memory leak!"); + return false; } - return this; + return true; } /** @@ -77,6 +81,7 @@ public boolean release() { long remainingCount = COUNTER_UPDATER.decrementAndGet(this); if (remainingCount == 0) { + logger.info("Destroying connection to {}, because the reference count reaches 0", remote); destroy(); return true; } else if (remainingCount <= -1) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java index 3f840696710..10f002df619 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java @@ -17,6 +17,8 @@ package org.apache.dubbo.remoting.api.connection; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.Constants; import org.apache.dubbo.rpc.model.FrameworkModel; @@ -26,6 +28,9 @@ import java.util.function.Consumer; public class SingleProtocolConnectionManager implements ConnectionManager { + private static final ErrorTypeAwareLogger logger = + LoggerFactory.getErrorTypeAwareLogger(SingleProtocolConnectionManager.class); + public static final String NAME = "single"; private final ConcurrentMap connections = new ConcurrentHashMap<>(16); @@ -42,21 +47,34 @@ public AbstractConnectionClient connect(URL url, ChannelHandler handler) { throw new IllegalArgumentException("url == null"); } return connections.compute(url.getAddress(), (address, conn) -> { + String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4"); if (conn == null) { - String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4"); - ConnectionManager manager = frameworkModel - .getExtensionLoader(ConnectionManager.class) - .getExtension(transport); - final AbstractConnectionClient connectionClient = manager.connect(url, handler); - connectionClient.addCloseListener(() -> connections.remove(address, connectionClient)); - return connectionClient; + return createAbstractConnectionClient(url, handler, address, transport); } else { - conn.retain(); + boolean shouldReuse = conn.retain(); + if (!shouldReuse) { + logger.info("Trying to create a new connection for {}.", address); + return createAbstractConnectionClient(url, handler, address, transport); + } return conn; } }); } + private AbstractConnectionClient createAbstractConnectionClient( + URL url, ChannelHandler handler, String address, String transport) { + ConnectionManager manager = + frameworkModel.getExtensionLoader(ConnectionManager.class).getExtension(transport); + final AbstractConnectionClient connectionClient = manager.connect(url, handler); + connectionClient.addCloseListener(() -> { + logger.info( + "Remove closed connection (with reference count==0) for address {}, a new one will be created for upcoming RPC requests routing to this address.", + address); + connections.remove(address, connectionClient); + }); + return connectionClient; + } + @Override public void forEachConnection(Consumer connectionConsumer) { connections.values().forEach(connectionConsumer);