Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use nodeCache to take place of treeCache. #7466

Merged
merged 10 commits into from
Apr 6, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;

import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
Expand All @@ -38,29 +37,30 @@
*/

public class CacheListener implements DataListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is CacheListener still needed? Can we connect TargetListener(zookeeper listener) and DataListener(dubbo listener) directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheListener can hold same path listeners, ZookeeperListener : ConfigurationListener -> 1 : N.
We still use CacheListener, ZookeeperListener : CacheListener -> 1 : 1, CacheListener : ConfigurationListener -> 1 : N.

CacheListener is an adapter, it holds real listeners, and adapt kinds of listener (zookeeper, nacos ...)

private static final int MIN_PATH_DEPTH = 5;

private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
private CountDownLatch initializedLatch;
private String rootPath;

public CacheListener(String rootPath, CountDownLatch initializedLatch) {
public CacheListener(String rootPath) {
this.rootPath = rootPath;
this.initializedLatch = initializedLatch;
}

public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
Set<ConfigurationListener> listeners = keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}

public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
Set<ConfigurationListener> listeners = keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}

public Set<ConfigurationListener> getConfigurationListeners(String key) {
return keyListeners.get(key);
}

/**
* This is used to convert a configuration nodePath into a key
* TODO doc
Expand Down Expand Up @@ -92,43 +92,19 @@ private String getGroup(String path) {

@Override
public void dataChanged(String path, Object value, EventType eventType) {
if (eventType == null) {
return;
}

if (eventType == EventType.INITIALIZED) {
initializedLatch.countDown();
return;
}

if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
return;
ConfigChangeType changeType;
if (value == null) {
changeType = ConfigChangeType.DELETED;
} else {
changeType = ConfigChangeType.MODIFIED;
}
String key = pathToKey(path);

// TODO We only care the changes happened on a specific path level, for example
// /dubbo/config/dubbo/configurators, other config changes not in this level will be ignored,
if (path.split("/").length >= MIN_PATH_DEPTH) {
String key = pathToKey(path);
ConfigChangeType changeType;
switch (eventType) {
case NodeCreated:
changeType = ConfigChangeType.ADDED;
break;
case NodeDeleted:
changeType = ConfigChangeType.DELETED;
break;
case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}

ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType);
Set<ConfigurationListener> listeners = keyListeners.get(path);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType);
Set<ConfigurationListener> listeners = keyListeners.get(path);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;

import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
*
Expand All @@ -38,7 +38,6 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
// The final root path would be: /configRootPath/"config"
private String rootPath;
private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch;

private CacheListener cacheListener;
private URL url;
Expand All @@ -49,22 +48,12 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
this.url = url;
rootPath = getRootPath(url);

initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.cacheListener = new CacheListener(rootPath);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));

zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {
// Wait for connection
long timeout = url.getParameter("init.timeout", 5000);
boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
if (!isCountDown) {
throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
+ url + " is correct");
}
} catch (InterruptedException e) {
logger.warn("Failed to build local cache for config center (zookeeper)." + url);
boolean isConnected = zkClient.isConnected();
if (!isConnected) {
throw new IllegalStateException("Failed to connect with zookeeper, pls check if url " + url + " is correct.");
}
}

Expand All @@ -74,7 +63,7 @@ public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration
*/
@Override
public String getInternalProperty(String key) {
return zkClient.getContent(buildPathKey("",key));
return zkClient.getContent(buildPathKey("", key));
}

@Override
Expand Down Expand Up @@ -107,10 +96,15 @@ protected Collection<String> doGetConfigKeys(String groupPath) {
@Override
protected void doAddListener(String pathKey, ConfigurationListener listener) {
cacheListener.addListener(pathKey, listener);
zkClient.addDataListener(pathKey, cacheListener, executor);
}

@Override
protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
cacheListener.removeListener(pathKey, listener);
Set<ConfigurationListener> configurationListeners = cacheListener.getConfigurationListeners(pathKey);
if (CollectionUtils.isNotEmpty(configurationListeners)) {
zkClient.removeDataListener(pathKey, cacheListener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void setUp() throws Exception {
try {
setData("/dubbo/config/dubbo/dubbo.properties", "The content from dubbo.properties");
setData("/dubbo/config/dubbo/service:version:group.configurators", "The content from configurators");
setData("/dubbo/config/appname", "The content from higer level node");
setData("/dubbo/config/appname", "The content from higher level node");
setData("/dubbo/config/dubbo/appname.tag-router", "The content from appname tagrouters");
setData("/dubbo/config/dubbo/never.change.DemoService.configurators", "Never change value from configurators");
} catch (Exception e) {
Expand Down Expand Up @@ -107,6 +107,7 @@ public void testAddListener() throws Exception {
configuration.addListener("appname.tag-router", listener3);
configuration.addListener("appname.tag-router", listener4);

Thread.sleep(100);
setData("/dubbo/config/dubbo/service:version:group.configurators", "new value1");
Thread.sleep(100);
setData("/dubbo/config/dubbo/appname.tag-router", "new value2");
Expand All @@ -116,10 +117,10 @@ public void testAddListener() throws Exception {
Thread.sleep(5000);

latch.await();
Assertions.assertEquals(1, listener1.getCount("service:version:group.configurators"));
Assertions.assertEquals(1, listener2.getCount("service:version:group.configurators"));
Assertions.assertEquals(1, listener3.getCount("appname.tag-router"));
Assertions.assertEquals(1, listener4.getCount("appname.tag-router"));
Assertions.assertEquals(2, listener1.getCount("service:version:group.configurators"));
Assertions.assertEquals(2, listener2.getCount("service:version:group.configurators"));
Assertions.assertEquals(2, listener3.getCount("appname.tag-router"));
Assertions.assertEquals(2, listener4.getCount("appname.tag-router"));

Assertions.assertEquals("new value1", listener1.getValue());
Assertions.assertEquals("new value1", listener2.getValue());
Expand Down
Loading