zookeeper学习以及Dubbo中的使用

zookeeper,动物园管理者,简明扼要,设计之初主要用来管理集群。它现在的应用有很多,比如命名服务、集群管理、分布式锁、分布式队列等等。

先看下zookeeper的存储结构。

可以看到zookeeper是一个标准的文件系统,每个节点都是一个目录,在zookeeper中被称作znode,每个节点可以有子节点,每个节点可以是临时的,也可以是永久的。永久的就是不管客户端是否断开连接,节点都会一直存在。如果一个节点已经已经存在,zookeeper会按照为节点加上序号创建。比如/dir1/dir20000,/dir1/dir20001。

此外,zookeeper还有偶一个重要的功能就是监听,客户端可以选择监听某一个节点,当节点发生变化时,zookeeper就会通知监听的客户端,类似一种回调。目前很多的应用场景其实都是使用zookeeper的watch机制来实现的。

1、分布式锁

分布式锁之前在我写的分布式锁文章有介绍,可以参考: 分布式锁

由于其自身优秀的机制使得其分布式锁比Redis分布式锁更稳定,更优秀。现在的Curator已经实现了分布式锁:InterProcessMutex。主要用到了临时有序节点、watch机制等实现的。

2、统一命名服务

看上面的zookeeper的数据结构介绍,可看出每个znode的nameservice是不同的,就算是相同节点,多次创建也会使用加上序号。

3、配置管理

这个主要是为了解决多台server使用同个配置的时候,可以把配置存储在zookeeper中,每个server作为zookeeper的客户端进行节点监听。如果节点数据发生变化(更改了配置),就更新本地配置。

但我赶脚现在这个用的不多。

4、集群管理

可以实时感知集群中的某个节点出现异常。原理还是通过zookeeper的watch机制。整个集群在一个父节点上,集群中的每个节点都是zookeeper中子节点中的一个。我们对父节点进行监听,如果某个子节点挂掉,getChildren就会感知变化,就会知道哪个集群中的哪个服务器挂掉了。

Zookeeper的应用比较多,其中比较典型的就是Kafka和Dubbo,本文就说一下在Dubbo中的应用。

看一下dubbo在zookeeper中的存储示意图:

上面图形已经非常简单明了了,主要分了四层。

第一层: /dubbo,是dubbo服务的根目录;

第二层:各个service名; 例子:/dubbo/com.test.testService;

第三层:类型。分为Provider和Consumers两种;   /dubbo/com.test.testService/Providers     /dubbo/com.test.testService/Consumers.。其实现在还有一种叫做configurators

第四层:对应的URL地址。

一个url地址类似如下:

dubbo://10.21.23.20:12305/com.hbnnmall.api.promotion.PromotionService?anyhost=true&application=hbnnshop&bean.name=ServiceBean:com.hbnnmall.api.promotion.PromotionService:1.0.0&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hbnnmall.api.promotion.PromotionService&methods=modifyPromotionProduct,getProductPromotionStock&pid=942711&register=true&release=2.7.3&revision=0.0.1-SNAPSHOT&side=provider&threads=300&timestamp=1606358051225&version=1.0.0

Dubbo还有一个监控中心,就是对第二层的节点进行Watch注册监听。如果其子节点有变化时,都可以实时感知。

看看dubbo如何使用的。

服务端:

在暴露服务时,会加载注册中心:

  private void doExportUrls() {
        List<URL> registryURLs = this.loadRegistries(true);
        Iterator var2 = this.protocols.iterator();

        while(var2.hasNext()) {
            ProtocolConfig protocolConfig = (ProtocolConfig)var2.next();
            String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
                return p + "/" + this.path;
            }).orElse(this.path), this.group, this.version);
            ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }

    }

loadRegisteries用来加载注册中心的地址。如果我们在配置文件中配置了注册中心,就会加载进来。

  registry:
    #    client: curator
    #    address: zookeeper://10.231.33.228:2181?backup=10.231.33.228:2182,10.231.33.228:2183
    address: nacos://10.38.163.218:80

再看一下doExportUrlsFor1Protocol

 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = "dubbo";
        }
 //如果有注册中心地址
 if (CollectionUtils.isNotEmpty(registryURLs)) {
                    metadataReportService = registryURLs.iterator();

                    while(metadataReportService.hasNext()) {
                        URL registryURL = (URL)metadataReportService.next();
                        if (!"injvm".equalsIgnoreCase(url.getProtocol())) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = this.loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
                            }

                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }

                            String proxy = url.getParameter("proxy");
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter("proxy", proxy);
                            }
                            //开始向注册中心注册协议,并发布任务

                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            this.exporters.add(exporter);
                        }
                    }
                } else {
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    this.exporters.add(exporter);
                }

                metadataReportService = null;
                MetadataReportService metadataReportService;
                if ((metadataReportService = this.getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }

这几句话很关键:

此时的url:registry://10.13.63.29:2181/org.apache.dubbo.registry.RegistryService?application=hbnnshop&backup=10.138.63.292182,10.138.63.292183&client=curator&dubbo=2.0.2&logger=slf4j&pid=966146&registry=zookeeper&release=2.7.3&timestamp=1606372768387, dubbo version: 2.7.3, current host: 192.168.200.1

上面url包含了所有的信息。后续的处理也是按照这个url处理的。 registry表示要向注册中心注册,registry=zookeeper表示了注册中心是zookeeper,也可以是nacos。

registry://10.13.63.28:1180/org.apache.dubbo.registry.RegistryService?application=hbnnshop&client=curator&dubbo=2.0.2&logger=slf4j&pid=967520&registry=nacos&release=2.7.3&timestamp=1606373185329, dubbo version: 2.7.3, current host: 192.168.200.1

Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);

看下ServiceConfig的初时化操作:

    /**
     * The {@link Protocol} implementation with adaptive functionality,it will be different in different scenarios.
     * A particular {@link Protocol} implementation is determined by the protocol attribute in the {@link URL}.
     * For example:
     *
     * <li>when the url is registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample,
     * then the protocol is <b>RegistryProtocol</b></li>
     *
     * <li>when the url is dubbo://224.5.6.7:1234/org.apache.dubbo.config.api.DemoService?application=dubbo-sample, then
     * the protocol is <b>DubboProtocol</b></li>
     * <p>
     * Actually,when the {@link ExtensionLoader} init the {@link Protocol} instants,it will automatically wraps two
     * layers, and eventually will get a <b>ProtocolFilterWrapper</b> or <b>ProtocolListenerWrapper</b>
     */
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    /**
     * A {@link ProxyFactory} implementation that will generate a exported service proxy,the JavassistProxyFactory is its
     * default implementation
     */
    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

它会调用private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(),生成一个代理对象。

Dubbo在运行时会使用javassit生成一个Protocol的代理:

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker invoker) throws com.alibaba.dubbo.rpc.RpcException {
        if (invoker == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (invoker.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = invoker.getUrl();
        //扩展名,默认为dubbo协议
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(invoker);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class type, com.alibaba.dubbo.common.URL url) throws com.alibaba.dubbo.rpc.RpcException {
        if (type == null) throw new IllegalArgumentException("url == null");
        //扩展名,
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(type, url);
    }
}

通过url的registry://  前缀可以获取到对应的Protocol实现类,即RegisteryProtocol,会调用其export方法。

不过生成的代理对象会将实际的invoker进行包装,类似这种:ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)

ProtocolFilterWrapper是一个过滤器wrapper,在执行真正的invoker之前,首先会执行一串的filter,可以看一下代码:

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //如果是注册协议,直接执行export方法
        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //在真正执行协议的export方法前,会执行buildInvokerchain。
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }


  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
      //这个地方就是获取到所有的filter,然后遍历执行。
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            // onError callback
                            if (filter instanceof ListenableFilter) {
                                Filter.Listener listener = ((ListenableFilter) filter).listener();
                                if (listener != null) {
                                    listener.onError(e, invoker, invocation);
                                }
                            }
                            throw e;
                        }
                        return asyncResult;
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }

        return new CallbackRegistrationInvoker<>(last, filters);
    }

所有的filter的实现也都是在META-INF/dubbo.internal中体现。

dubbo会调用RegisteryProtocol的export方法(从哪调用?这个地方我还没弄明白):

  @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        //通过url获取对应的注册中心,nacos,zookeeper等等
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
           //将url发送到注册中心
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

完整的一个调用链条如下:

-org.apache.dubbo.registry.integration.RegistryProtocol#register

-org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry

-org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry

-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry

-org.apache.dubbo.registry.support.FailbackRegistry#register

-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister

-org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient#create

-org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient#createPersist && createEphemeral

现在终于看到了ZookeeperRegistery,其实就是使用ZK客户端进行操作。

 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
       //增加了一个监听
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

其注册的具体操作如下:

  @Override
    public void doRegister(URL url) {
        try {
            //默认创建临时节点
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } 


 @Override
    public void create(String path, boolean ephemeral) {
        if (!ephemeral) {
            if (checkExists(path)) {
                return;
            }
        }
        int i = path.lastIndexOf('/');
         //根目录是永久节点,递归调用
        if (i > 0) {
            create(path.substring(0, i), false);
        }
        if (ephemeral) {
            createEphemeral(path);
        } else {
            createPersistent(path);
        }
    }

上面是服务Provider向注册中心注册的过程,再看一下消费方的服务发现过程。

调用链:

subscribe(URL):172, RegistryDirectory (org.apache.dubbo.registry.integration), RegistryDirectory.java
doRefer(Cluster, Registry, Class, URL):411, RegistryProtocol (org.apache.dubbo.registry.integration), RegistryProtocol.java
refer(Class, URL):392, RegistryProtocol (org.apache.dubbo.registry.integration), RegistryProtocol.java
refer(Class, URL):70, QosProtocolWrapper (org.apache.dubbo.qos.protocol), QosProtocolWrapper.java
refer(Class, URL):128, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol), ProtocolFilterWrapper.java
refer(Class, URL):69, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol), ProtocolListenerWrapper.java
refer(Class, URL):-1, Protocol$Adaptive (org.apache.dubbo.rpc), Protocol$Adaptive.java
createProxy(Map):396, ReferenceConfig (org.apache.dubbo.config), ReferenceConfig.java
init():329, ReferenceConfig (org.apache.dubbo.config), ReferenceConfig.java
get():250, ReferenceConfig (org.apache.dubbo.config), ReferenceConfig.java

同样,也是通过SPI自适应机制,通过Protocol$Adaptive代理获取到对应的protocol实现类。

    /**
     * The {@link Protocol} implementation with adaptive functionality,it will be different in different scenarios.
     * A particular {@link Protocol} implementation is determined by the protocol attribute in the {@link URL}.
     * For example:
     *
     * <li>when the url is registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample,
     * then the protocol is <b>RegistryProtocol</b></li>
     *
     * <li>when the url is dubbo://224.5.6.7:1234/org.apache.dubbo.config.api.DemoService?application=dubbo-sample, then
     * the protocol is <b>DubboProtocol</b></li>
     * <p>
     * Actually,when the {@link ExtensionLoader} init the {@link Protocol} instants,it will automatically wraps two
     * layers, and eventually will get a <b>ProtocolFilterWrapper</b> or <b>ProtocolListenerWrapper</b>
     */
    private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

如果配置了注册中心,就会调用RegisterProtocol实现类,消费方调用的是refer方法。源代码:

 @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
       //根据url获取到对应的registery。
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
       //执行doRefer方法
        return doRefer(cluster, registry, type, url);
    }


  private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
       //获取订阅url
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {            //进行消费者注册
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl());
        }
        //订阅服务提供者
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

上面的refer做了两个工作,一个是注册消费者,一个是订阅服务提供者。订阅主要是为了当服务提供者节点有变化时,注册中心可以及时通知消费者。

上面会调用RegistryDirectory的subscribe方法。

 public void subscribe(URL url) {
        setConsumerUrl(url);
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        registry.subscribe(url, this);
    }

最终会调用具体注册机的doSubscribe,看下ZK的。

@Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //第一次订阅,会执行一遍notify进行当前provider url的更新
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

首先会从缓存中拿listener,如果没有的话,会创建一个listener:

if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));

上面listener执行的notify操作。

 /**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((CollectionUtils.isEmpty(urls))
                && !ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //调用dubbo的NotifyListener的notify方法
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }

上面的listener是dubbo定义的notifylistener,实现了ZK客户端和dubbo NotifyListener的转换。

当有节点变化时,注册中心会通知所有的listener,最终会调用RegistryDirectort的notify。

    @Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(url -> {
                    if (UrlUtils.isConfigurator(url)) {
                        return CONFIGURATORS_CATEGORY;
                    } else if (UrlUtils.isRoute(url)) {
                        return ROUTERS_CATEGORY;
                    } else if (UrlUtils.isProvider(url)) {
                        return PROVIDERS_CATEGORY;
                    }
                    return "";
                }));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        //根据最新的providerurl进行覆盖更新。
        refreshOverrideAndInvoker(providerURLs);
    }

几个问题;

dubbo如何生成代理的,该如何查看代理类代码?日志的打印输出。

参考资料:

Zookeeper分布式服务框架

zookeeper在Dubbo中的应用

Zookeeper系列(一) Zookeeper的原理学习

ZooKeeper学习笔记及应用场景梳理

ZooKeeper概念详解,最全整理

Dubbo源码解析(七)注册中心——zookeeper

dubbo 源码分析 服务注册(一)

Dubbo源码分析系列之Invoker及服务注册

Dubbo中订阅和通知解析

--------EOF---------
微信分享/微信扫码阅读