diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java index a993c3f70fd..126319f1d9a 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java @@ -599,6 +599,10 @@ public class Constants { public static final String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName(); + public static final String SHARE_EXECUTOR_KEY = "share.threadpool"; + + public static final String SHARED_CONSUMER_EXECUTOR_PORT = "consumer.executor.port"; + public static final String GENERIC_SERIALIZATION_NATIVE_JAVA = "nativejava"; public static final String GENERIC_SERIALIZATION_DEFAULT = "true"; diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/store/support/SimpleDataStore.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/store/support/SimpleDataStore.java index d070875acf9..3d77f9ab92c 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/store/support/SimpleDataStore.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/store/support/SimpleDataStore.java @@ -48,12 +48,12 @@ public Object get(String componentName, String key) { @Override public void put(String componentName, String key, Object value) { - Map componentData = data.get(componentName); + ConcurrentMap componentData = data.get(componentName); if (null == componentData) { data.putIfAbsent(componentName, new ConcurrentHashMap()); componentData = data.get(componentName); } - componentData.put(key, value); + componentData.putIfAbsent(key, value); } @Override diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java index ff46d8bdbf4..a3677aa81a7 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java @@ -43,6 +43,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static com.alibaba.dubbo.common.Constants.SHARED_CONSUMER_EXECUTOR_PORT; +import static com.alibaba.dubbo.common.Constants.SHARE_EXECUTOR_KEY; + /** * AbstractClient */ @@ -105,10 +108,14 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } + boolean shouldShareExecutor = url.getParameter(SHARE_EXECUTOR_KEY, false); + String portKey = shouldShareExecutor ? SHARED_CONSUMER_EXECUTOR_PORT : Integer.toString(url.getPort()); executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) - .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); - ExtensionLoader.getExtensionLoader(DataStore.class) - .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); + .getDefaultExtension().get(Constants.CONSUMER_SIDE, portKey); + if (!shouldShareExecutor) { + ExtensionLoader.getExtensionLoader(DataStore.class) + .getDefaultExtension().remove(Constants.CONSUMER_SIDE, portKey); + } } protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java index 71f4999789f..fd46dadca79 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java @@ -32,6 +32,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static com.alibaba.dubbo.common.Constants.SHARED_CONSUMER_EXECUTOR_PORT; +import static com.alibaba.dubbo.common.Constants.SHARE_EXECUTOR_KEY; + public class WrappedChannelHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class); @@ -44,17 +47,32 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate { protected final URL url; + protected DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); + public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; - executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); - String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; + String componentKey; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; + if (url.getParameter(SHARE_EXECUTOR_KEY, false)) { + ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT); + if (cExecutor == null) { + cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); + dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor); + cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT); + } + executor = cExecutor; + } else { + executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); + dataStore.put(componentKey, Integer.toString(url.getPort()), executor); + } + } else { + componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; + executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); + dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } - DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); - dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } public void close() { diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/WrappedChannelHandlerTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/WrappedChannelHandlerTest.java index 96e147c3764..8b15368969f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/WrappedChannelHandlerTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/WrappedChannelHandlerTest.java @@ -21,12 +21,12 @@ import com.alibaba.dubbo.remoting.Channel; import com.alibaba.dubbo.remoting.RemotingException; import com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler; - import junit.framework.Assert; import org.junit.Before; import org.junit.Test; import java.lang.reflect.Field; +import java.util.concurrent.ThreadPoolExecutor; import static org.junit.Assert.fail; @@ -105,6 +105,30 @@ public void test_Caught_Biz_Error() throws RemotingException { } } + @Test + public void testShareConsumerExecutor() { + URL url1 = URL.valueOf("dubbo://10.20.30.40:1234/DemoService?side=consumer&share.threadpool=true&threadpool=fixed"); + WrappedChannelHandler handler1 = new WrappedChannelHandler(new BizChannelHander(true), url1); + WrappedChannelHandler handler2 = new WrappedChannelHandler(new BizChannelHander(true), url1); + Assert.assertEquals(200, ((ThreadPoolExecutor) (handler1.getExecutor())).getMaximumPoolSize()); + Assert.assertSame(handler1.getExecutor(), handler2.getExecutor()); + + URL url2 = URL.valueOf("dubbo://10.20.30.40:1234/DemoService?side=consumer&share.threadpool=false"); + WrappedChannelHandler handler3 = new WrappedChannelHandler(new BizChannelHander(true), url2); + WrappedChannelHandler handler4 = new WrappedChannelHandler(new BizChannelHander(true), url2); + Assert.assertNotSame(handler3.getExecutor(), handler4.getExecutor()); + Assert.assertNotSame(handler3.getExecutor(), handler1.getExecutor()); + } + + @Test + public void testProviderExecutor() { + URL url1 = URL.valueOf("dubbo://10.20.30.40:1234/DemoService?side=provider"); + URL url2 = URL.valueOf("dubbo://10.20.30.40:6789/DemoService?side=provider"); + WrappedChannelHandler handler1 = new WrappedChannelHandler(new BizChannelHander(true), url1); + WrappedChannelHandler handler2 = new WrappedChannelHandler(new BizChannelHander(true), url2); + Assert.assertNotSame(handler1.getExecutor(), handler2.getExecutor()); + } + class BizChannelHander extends MockedChannelHandler { private boolean invokeWithBizError;