zookeeper学习以及Dubbo中的使用
zookeeper,动物园管理者,简明扼要,设计之初主要用来管理集群。它现在的应用有很多,比如命名服务、集群管理、分布式锁、分布式队列等等。
先看下zookeeper的存储结构。
可以看到zookeeper是一个标准的文件系统,每个节点都是一个目录,在zookeeper中被称作znode,每个节点可以有子节点,每个节点可以是临时的,也可以是永久的。永久的就是不管客户端是否断开连接,节点都会一直存在。如果一个节点已经已经存在,zookeeper会按照为节点加上序号创建。比如/dir1/dir20000,/dir1/dir20001。
此外,zookeeper还有偶一个重要的功能就是监听,客户端可以选择监听某一个节点,当节点发生变化时,zookeeper就会通知监听的客户端,类似一种回调。目前很多的应用场景其实都是使用zookeeper的watch机制来实现的。
1、分布式锁
分布式锁之前在我写的分布式锁文章有介绍,可以参考: 分布式锁 。
由于其自身优秀的机制使得其分布式锁比Redis分布式锁更稳定,更优秀。现在的Curator已经实现了分布式锁:InterProcessMutex。主要用到了临时有序节点、watch机制等实现的。
2、统一命名服务
看上面的zookeeper的数据结构介绍,可看出每个znode的nameservice是不同的,就算是相同节点,多次创建也会使用加上序号。
3、配置管理
这个主要是为了解决多台server使用同个配置的时候,可以把配置存储在zookeeper中,每个server作为zookeeper的客户端进行节点监听。如果节点数据发生变化(更改了配置),就更新本地配置。
但我赶脚现在这个用的不多。
4、集群管理
可以实时感知集群中的某个节点出现异常。原理还是通过zookeeper的watch机制。整个集群在一个父节点上,集群中的每个节点都是zookeeper中子节点中的一个。我们对父节点进行监听,如果某个子节点挂掉,getChildren就会感知变化,就会知道哪个集群中的哪个服务器挂掉了。
Zookeeper的应用比较多,其中比较典型的就是Kafka和Dubbo,本文就说一下在Dubbo中的应用。
看一下dubbo在zookeeper中的存储示意图:
上面图形已经非常简单明了了,主要分了四层。
第一层: /dubbo,是dubbo服务的根目录;
第二层:各个service名; 例子:/dubbo/com.test.testService;
第三层:类型。分为Provider和Consumers两种; /dubbo/com.test.testService/Providers /dubbo/com.test.testService/Consumers.。其实现在还有一种叫做configurators
第四层:对应的URL地址。
一个url地址类似如下:
dubbo://10.21.23.20:12305/com.hbnnmall.api.promotion.PromotionService?anyhost=true&application=hbnnshop&bean.name=ServiceBean:com.hbnnmall.api.promotion.PromotionService:1.0.0&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hbnnmall.api.promotion.PromotionService&methods=modifyPromotionProduct,getProductPromotionStock&pid=942711®ister=true&release=2.7.3&revision=0.0.1-SNAPSHOT&side=provider&threads=300×tamp=1606358051225&version=1.0.0
Dubbo还有一个监控中心,就是对第二层的节点进行Watch注册监听。如果其子节点有变化时,都可以实时感知。
看看dubbo如何使用的。
服务端:
在暴露服务时,会加载注册中心:
private void doExportUrls() {
List<URL> registryURLs = this.loadRegistries(true);
Iterator var2 = this.protocols.iterator();
while(var2.hasNext()) {
ProtocolConfig protocolConfig = (ProtocolConfig)var2.next();
String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
return p + "/" + this.path;
}).orElse(this.path), this.group, this.version);
ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
loadRegisteries用来加载注册中心的地址。如果我们在配置文件中配置了注册中心,就会加载进来。
registry: # client: curator # address: zookeeper://10.231.33.228:2181?backup=10.231.33.228:2182,10.231.33.228:2183 address: nacos://10.38.163.218:80
再看一下doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = "dubbo";
}
//如果有注册中心地址
if (CollectionUtils.isNotEmpty(registryURLs)) {
metadataReportService = registryURLs.iterator();
while(metadataReportService.hasNext()) {
URL registryURL = (URL)metadataReportService.next();
if (!"injvm".equalsIgnoreCase(url.getProtocol())) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = this.loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
String proxy = url.getParameter("proxy");
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter("proxy", proxy);
}
//开始向注册中心注册协议,并发布任务
Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
}
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
metadataReportService = null;
MetadataReportService metadataReportService;
if ((metadataReportService = this.getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
这几句话很关键:
此时的url:registry://10.13.63.29:2181/org.apache.dubbo.registry.RegistryService?application=hbnnshop&backup=10.138.63.292182,10.138.63.292183&client=curator&dubbo=2.0.2&logger=slf4j&pid=966146®istry=zookeeper&release=2.7.3×tamp=1606372768387, dubbo version: 2.7.3, current host: 192.168.200.1
上面url包含了所有的信息。后续的处理也是按照这个url处理的。 registry表示要向注册中心注册,registry=zookeeper表示了注册中心是zookeeper,也可以是nacos。
registry://10.13.63.28:1180/org.apache.dubbo.registry.RegistryService?application=hbnnshop&client=curator&dubbo=2.0.2&logger=slf4j&pid=967520®istry=nacos&release=2.7.3×tamp=1606373185329, dubbo version: 2.7.3, current host: 192.168.200.1
Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
看下ServiceConfig的初时化操作:
/**
* The {@link Protocol} implementation with adaptive functionality,it will be different in different scenarios.
* A particular {@link Protocol} implementation is determined by the protocol attribute in the {@link URL}.
* For example:
*
* <li>when the url is registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample,
* then the protocol is <b>RegistryProtocol</b></li>
*
* <li>when the url is dubbo://224.5.6.7:1234/org.apache.dubbo.config.api.DemoService?application=dubbo-sample, then
* the protocol is <b>DubboProtocol</b></li>
* <p>
* Actually,when the {@link ExtensionLoader} init the {@link Protocol} instants,it will automatically wraps two
* layers, and eventually will get a <b>ProtocolFilterWrapper</b> or <b>ProtocolListenerWrapper</b>
*/
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**
* A {@link ProxyFactory} implementation that will generate a exported service proxy,the JavassistProxyFactory is its
* default implementation
*/
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
它会调用private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(),生成一个代理对象。
Dubbo在运行时会使用javassit生成一个Protocol的代理:
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker invoker) throws com.alibaba.dubbo.rpc.RpcException {
if (invoker == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (invoker.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = invoker.getUrl();
//扩展名,默认为dubbo协议
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(invoker);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class type, com.alibaba.dubbo.common.URL url) throws com.alibaba.dubbo.rpc.RpcException {
if (type == null) throw new IllegalArgumentException("url == null");
//扩展名,
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(type, url);
}
}
通过url的registry:// 前缀可以获取到对应的Protocol实现类,即RegisteryProtocol,会调用其export方法。
不过生成的代理对象会将实际的invoker进行包装,类似这种:ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)
ProtocolFilterWrapper是一个过滤器wrapper,在执行真正的invoker之前,首先会执行一串的filter,可以看一下代码:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//如果是注册协议,直接执行export方法
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//在真正执行协议的export方法前,会执行buildInvokerchain。
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//这个地方就是获取到所有的filter,然后遍历执行。
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
所有的filter的实现也都是在META-INF/dubbo.internal中体现。
dubbo会调用RegisteryProtocol的export方法(从哪调用?这个地方我还没弄明白):
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//通过url获取对应的注册中心,nacos,zookeeper等等
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
//将url发送到注册中心
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
完整的一个调用链条如下:
-org.apache.dubbo.registry.integration.RegistryProtocol#register
-org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
-org.apache.dubbo.registry.support.FailbackRegistry#register
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
-org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient#create
-org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient#createPersist && createEphemeral
现在终于看到了ZookeeperRegistery,其实就是使用ZK客户端进行操作。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
//增加了一个监听
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
其注册的具体操作如下:
@Override
public void doRegister(URL url) {
try {
//默认创建临时节点
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
//根目录是永久节点,递归调用
if (i > 0) {
create(path.substring(0, i), false);
}
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
上面是服务Provider向注册中心注册的过程,再看一下消费方的服务发现过程。
调用链:
subscribe(URL):172, RegistryDirectory (org.apache.dubbo.registry.integration), RegistryDirectory.java
doRefer(Cluster, Registry, Class, URL):411, RegistryProtocol (org.apache.dubbo.registry.integration), RegistryProtocol.java
refer(Class, URL):392, RegistryProtocol (org.apache.dubbo.registry.integration), RegistryProtocol.java
refer(Class, URL):70, QosProtocolWrapper (org.apache.dubbo.qos.protocol), QosProtocolWrapper.java
refer(Class, URL):128, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol), ProtocolFilterWrapper.java
refer(Class, URL):69, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol), ProtocolListenerWrapper.java
refer(Class, URL):-1, Protocol$Adaptive (org.apache.dubbo.rpc), Protocol$Adaptive.java
createProxy(Map):396, ReferenceConfig (org.apache.dubbo.config), ReferenceConfig.java
init():329, ReferenceConfig (org.apache.dubbo.config), ReferenceConfig.java
get():250, ReferenceConfig (org.apache.dubbo.config), ReferenceConfig.java
同样,也是通过SPI自适应机制,通过Protocol$Adaptive代理获取到对应的protocol实现类。
/**
* The {@link Protocol} implementation with adaptive functionality,it will be different in different scenarios.
* A particular {@link Protocol} implementation is determined by the protocol attribute in the {@link URL}.
* For example:
*
* <li>when the url is registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample,
* then the protocol is <b>RegistryProtocol</b></li>
*
* <li>when the url is dubbo://224.5.6.7:1234/org.apache.dubbo.config.api.DemoService?application=dubbo-sample, then
* the protocol is <b>DubboProtocol</b></li>
* <p>
* Actually,when the {@link ExtensionLoader} init the {@link Protocol} instants,it will automatically wraps two
* layers, and eventually will get a <b>ProtocolFilterWrapper</b> or <b>ProtocolListenerWrapper</b>
*/
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
如果配置了注册中心,就会调用RegisterProtocol实现类,消费方调用的是refer方法。源代码:
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
//根据url获取到对应的registery。
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//执行doRefer方法
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//获取订阅url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { //进行消费者注册
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
//订阅服务提供者
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
上面的refer做了两个工作,一个是注册消费者,一个是订阅服务提供者。订阅主要是为了当服务提供者节点有变化时,注册中心可以及时通知消费者。
上面会调用RegistryDirectory的subscribe方法。
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
最终会调用具体注册机的doSubscribe,看下ZK的。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//第一次订阅,会执行一遍notify进行当前provider url的更新
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
首先会从缓存中拿listener,如果没有的话,会创建一个listener:
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
上面listener执行的notify操作。
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//调用dubbo的NotifyListener的notify方法
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}
上面的listener是dubbo定义的notifylistener,实现了ZK客户端和dubbo NotifyListener的转换。
当有节点变化时,注册中心会通知所有的listener,最终会调用RegistryDirectort的notify。
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
//根据最新的providerurl进行覆盖更新。
refreshOverrideAndInvoker(providerURLs);
}
几个问题;
dubbo如何生成代理的,该如何查看代理类代码?日志的打印输出。
参考资料:
微信分享/微信扫码阅读