dubbo使用简单介绍与启动流程以及请求响应流程

一、使用

1、服务端

Dubbo配置:

@Configuration
public class DubboConfiguration {

    @Value("${dubbo.protocol.port}")
    private int port;


    @Value("${server.port}")
    private String httpGateWayPort;

    //
    @Bean
    public ApplicationConfig applicationConfig() {
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("nacos-provider-test");
        applicationConfig.setRegistry(registryConfig());
        return applicationConfig;
    }

    @Bean
    public RegistryConfig registryConfig() {
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("nacos://127.0.0.1:80");
//        registryConfig.setAddress("zookeeper://127.0.0.1:2181");
        return registryConfig;
    }


    @Bean
    public ProtocolConfig protocolConfig() {
        ProtocolConfig protocolConfig = new ProtocolConfig();
        protocolConfig.setPort(port);
        protocolConfig.setTransporter("netty4");
        protocolConfig.setThreadpool("fixed");
        protocolConfig.setThreads(500);
        return protocolConfig;
    }

    @Bean
    public GateWayProcessor gateWayProcessor() {
        return new GateWayProcessor(String.valueOf(port));
    }

}

配置定义了应用配置,注册中心,协议配置等。

暴露服务:


@Service(interfaceClass = BankStatmentService.class,version = "1.0.0")
public class BankStatmentServiceImpl implements BankStatmentService {

}

通过上述的配置和定义,springboot启动后,Dubbo服务也会启动,并注册到nacos注册中心。

二、启动流程介绍

其实在之前的一篇文章,有过简单的介绍: Java的NIO及Netty

dubbo和spring整合可以通过XML配置和注解两种方式实现。

XML属于传统的spring配置方法,在spring中由一种机制叫做XML扩展机制Schema,用于定义和配置 Bean。 它允许开发者自定义 XML bean 解析器,并将解析器本身以及最终定义的 Bean 集成到 Spring IOC 容器中。

Dubbo实现了spring 的NamespaceHandler接口:DubboNamespaceHandler。该接口的初始化方法:

public void init() {
        this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        this.registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

另外一种方式是注解方式,也是目前springboot中多数采用的一种,@Service,@Reference。对应的处理器是ServiceAnnotationBeanPostProcessor和ReferenceAnnotationBeanPostProcessor。其主要目的都是生成对应的Bean。

先看下服务端(都说注解方式的):

@Service(interfaceClass = GoodService.class,version = "1.0.0")

整体流程:

1、根据注解,Dubbo会创建Bean;

2、根据事件监听机制,触发实例化的ServiceBean暴露服务;

3、向注册中心注册服务;

现在具体说一下服务启动流程(dubbo:2.7.3)。

1、serviceBean的初始化

spring在初始化的过程中会在实例化Bean前、实例化之后等各个流程中执行一些processor,主要目的是实现一些自定义的特性。ServiceAnnotationBeanPostProcessor实现了BeanDefinitionRegistryPostProcessor。具体spring容器启动流程可参考: https://blog.csdn.net/qq_35190492/article/details/110383213

ServiceAnnotationBeanPostProcessor的postProcessBeanDefinitionRegistry方法负责注册所有的Servicebean,即使用service注解的。它会扫描指定的package。在应用中可以指定:

    @EnableDubbo(scanBasePackages = "com.hbnnmall.hbnnmallproduct.service")

部分注册Bean的代码:

private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

        scanner.setBeanNameGenerator(beanNameGenerator);
 
        //添加了Service注解过滤器
        scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));

        /**
         * Add the compatibility for legacy Dubbo's @Service
         *
         * The issue : https://github.com/apache/dubbo/issues/4330
         * @since 2.7.3
         */
        scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));

        for (String packageToScan : packagesToScan) {

        ......
          }

}

2、Dubbo服务的启动

ServiceBean是继承InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener等。其中ApplicationListener就是一种事件机制,其中有个onApplicationEvent方法,就是dubbo服务启动的入口,实例化之后springboot会向所有的listener发布事件。通过打段点,debug可发现,触发该方法的是:

org.springframework.context.event.ContextRefreshedEvent,目前spring存在的事件包括:

  • ContextRefreshedEvent:Spring应用上下文就绪事件;
  • ContextStartedEvent: Spring应用上下文启动事件;
  • ContextStopedEvent: Spring应用上下文停止事件;
  • ContextClosedEvent: Spring应用上下文关闭事件;
public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!this.isExported() && !this.isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + this.getInterface());
            }

            this.export();
        }

    }

上述方法直接调用的是ServiceConfig的export方法,主要是服务暴露的流程。

先来一张服务的整体流程示意图(来自网络):

public synchronized void export(ClassLoader classLoader) {
        if (this.provider != null) {
            if (this.export == null) {
                this.export = this.provider.getExport();
            }

            if (this.delay == null) {
                this.delay = this.provider.getDelay();
            }
        }

        if (this.export == null || this.export) {
            if (this.delay != null && this.delay > 0) {
                delayExportExecutor.schedule(this::doExport, (long)this.delay, TimeUnit.MILLISECONDS);
            } else {
                this.doExport2(classLoader);
            }

        }
    }

首先是各种环境配置检测。

         completeCompoundConfigs();
        // Config Center should always being started first.
        startConfigCenter();
        checkDefault();
        checkProtocol();
        checkApplication();
        // if protocol is not injvm checkRegistry
        if (!isOnlyInJvm()) {
            checkRegistry();
        }
        this.refresh();
        checkMetadataReport();

doExportUtls会根据protocols循环调用doExportUrlsFor1Protocol()方法。

   private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

如果我们配置了多个协议,每个协议都会完成一次服务暴露。

doExportUrlsFor1Protocol最重要的部分:

通过代理获取invoker。代理ProxyFactory是通过Dubbo SPI机制获取的。

   Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);

Dubbo接口在拓展方法被调用时,根据URL动态生成接口的Adaptive自适应拓展类,根据URL中用户指定的参数key,调用ExtensionLoader.getExtensionLoader(XXXClass).getExtension(key)来获取具体的实现类。

URL类似如下:

provider://10.239.33.161:12348/com.mi.youpin.finance.service.BankStatmentService?anyhost=true&application=nacos-provider-test&category=configurators&check=false&dubbo=2.0.2&dubbo_version=2.7.0_0.0.3_2019-01-08&generic=false&interface=com.msi.y.finance.service.BankStatmentService&methods=getMerchantCFBankstatment,getMerchantDaishouBankstatment&pid=9764&revision=1.0.0&side=provider&threadpool=fixed&threads=500&timestamp=1581036916787&transporter=netty4&version=1.0.0, dubbo version: 2.7.0-msi-SNAPSHOT, current host: 10.239.33.161 [AbstractRegistry.java : 380] [WARN  ] [com.alibaba.nacos.naming.client.listener]

private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

getAdaptiveExtension()涉及到的代码很多。

对于protocol,它会根据Protocol接口,生成自动扩展类Protocol@Adaptive。

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy()" +
                " of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol" +
                ".getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension =
                (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension =
                (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

看到上面代码也可以看到,最后仍然是通过key找到实现类的。

现在看一下如何实现服务注册发现的。服务暴露的注册url(url是很重要的, url作为解耦的通信数据(跨层调用的参数),有了它dubbo就可以更容易做到业务逻辑实现的替换。除此之外可以看到url中还包含了大量的辅助参数(例如:timeout,version,organization等)供服务治理使用),注册url如下:

  Register dubbo service com.finance.service.IFinanceTest url dubbo://10.23.50.31:12348/com.finance.service.IFinanceTest?anyhost=true&application=nacos-provider-test&bind.ip=10.23.50.31&bind.port=12348&dubbo=2.0.2&dubbo_version=2.7.0_0.0.3_2019-01-08&generic=false&interface=com.finance.service.IFinanceTest&methods=listBs&pid=7296&revision=1.0.0&side=provider&timestamp=1581128490232&version=1.0.0 to registry registry://10.238.63.218:80/org.apache.dubbo.registry.RegistryService?application=nacos-provider-test&dubbo=2.0.2&dubbo_version=2.7.0_0.0.3_2019-01-08&pid=7296&registry=nacos&timestamp=1581128490185, dubbo version: 2.7.0-SNAPSHOT, current host: 10.23.50.31

对于有registerUrl的,会调用RegisterProtocol的export方法。

根据上面的registry: 就可以拿到对应的protocol,上面在Protocol@Adaptive,已经有过介绍了。此时按到的是RegisterProtocol。接下来会执行其export方法。

2.7.3版本的将把Server启动等过程,放到了这里。

  @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

上面说先获得了两个url,一个是注册地址,一个是服务provider地址。

final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
该语句会export具体的invoker。

 private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

同样地,根据Dubbo自适应扩展,根据providerUrl,这里面有各种协议,比如dubbo,会调用DubboProtocol的export,真正开始启动nettyServer,开启服务。

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

开启本地服务,就要将服务注册到注册中心。

Registry registry = this.getRegistry(originInvoker),它的作用就是获取实际注册的实现类;

看一下他的实现:

 private Registry getRegistry(Invoker<?> originInvoker) {
        URL registryUrl = this.getRegistryUrl(originInvoker);
        return this.registryFactory.getRegistry(registryUrl);
    }

上面的RegistryFactory仍然是一个SPI扩展。

@SPI("dubbo")
public interface RegistryFactory {
    @Adaptive({"protocol"})
    Registry getRegistry(URL var1);
}

随后会调用:AbstractRegistryFactory.getRegistry方法。

 public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName()).addParameter("interface", RegistryService.class.getName()).removeParameters(new String[]{"export", "refer"});
        String key = url.toServiceStringWithoutResolving();
        LOCK.lock();

        Registry var4;
        try {
            //根据key获取注册类
            Registry registry = (Registry)REGISTRIES.get(key);
            if (registry == null) {
                registry = this.createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }

                REGISTRIES.put(key, registry);
                var4 = registry;
                return var4;
            }

            var4 = registry;
        } finally {
            LOCK.unlock();
        }

        return var4;
    }

上面就是获取注册实现类的具体过程,本文例子拿到的是NacosRegistry。拿到后要向注册中心注册暴露的url.

registry.register(registedProviderUrl); //向注册中心注册当前暴露的服务的URL

这个register是子类尽进行重载的方法。

继承关系  NacosRegistry->FailbackRegistry->AbstractRegistry

FailbackRegistry进行了重载:

public void register(URL url) {
        super.register(url);
        this.removeFailedRegistered(url);
        this.removeFailedUnregistered(url);

        try {
            this.doRegister(url);
        } catch (Exception var6) {
            Throwable t = var6;
            boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true) && !"consumer".equals(url.getProtocol());
            boolean skipFailback = var6 instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = var6.getCause();
                }

                throw new IllegalStateException("Failed to register " + url + " to registry " + this.getUrl().getAddress() + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t);
            }

            this.logger.error("Failed to register " + url + ", waiting for retry, cause: " + var6.getMessage(), var6);
            this.addFailedRegistered(url);
        }

    }

看到上面的方法会调用实现类的doRegister,这里就是NacosRegistry的doRegister.

  public void doRegister(URL url) {
        final String serviceName = this.getServiceName(url);
        final Instance instance = this.createInstance(url);
        this.execute(new NacosRegistry.NamingServiceCallback() {
            public void callback(NamingService namingService) throws NacosException {
                namingService.registerInstance(serviceName, instance);
            }
        });
    }

上面方法中,首先要创建一个实例:

 private Instance createInstance(URL url) {
        String category = url.getParameter("category", "providers");
        URL newURL = url.addParameter("category", category);
        newURL = newURL.addParameter("protocol", url.getProtocol());
        String ip = NetUtils.getLocalHost();
        int port = newURL.getParameter("bind.port", url.getPort());
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setMetadata(new HashMap(newURL.getParameters()));
        return instance;
    }

上面的实例包含分类,协议,ip,port以及元数据等等。元数据就是dubbo版本等等.

创建实例后,要注册到注册中心:

 public void registerInstance(String serviceName, Instance instance) throws NacosException {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setDom(serviceName);
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        this.beatReactor.addBeatInfo(serviceName, beatInfo);
        this.serverProxy.registerService(serviceName, instance);
    }



   public void registerService(String serviceName, Instance instance) throws NacosException {
        LogUtils.LOG.info("REGISTER-SERVICE", "registering service " + serviceName + " with instance:" + instance);
        Map<String, String> params = new HashMap(8);
        params.put("tenant", this.namespace);
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        params.put("serviceName", serviceName);
        params.put("clusterName", instance.getClusterName());
        this.reqAPI("/nacos/v1/ns/instance", params, (String)"PUT");
    }

具体注册就不说了,总之是发了一个HTTP请求。

注册中心接收到服务会给订阅者(也就是消费方)发送消息,消费方接到消息需要做的事情:

再看一下消费端。

消费端启动流程:

同样,根据Reference注解,生成Referencebean实例,该Bean class implement   InitializingBean,实例化时,会执行afterPropertiesSet。

主要步骤:

1、ReferenceAnnotationBeanPostProcessor负责Reference Bean的实例化和注册;

2、ReferenceBean实例化时会加载各种配置;

3、创建一个消费者代理,用于进行实际的服务调用。

       ref = createProxy(map);

        String serviceKey = URL.buildKey(interfaceName, group, version);
        ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
        initialized = true;

创建时主要做的几件事

1、是否使用同一jvm中的服务提供者(本地调用)

2、如果是远程调用会检查注册中心;

3、构造invoker。如果是多个服务提供着,就构造ClusterInvoker,这是Dubbo封装的可处理集群容错的Cluster Invoker,它支持不同的处理容错的方式,比如自动切换,快速失败等等,具体可参考: Dubbo集群

Invoker的构建会调用DubboProtocol的refer

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        this.optimizeSerialization(url);
        DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
        this.invokers.add(invoker);
        return invoker;
    }

4、将consumer注册到注册中心;

5、 监听调用服务的providers,configurators,routers节点的数据变化 ,随着注册中心数据的变化动态刷新invoker列表。监听由具体的Registry完成,比如ZK,可以监听某些子节点的变化。

6、将providers节点下的所有匹配的providerURL转换成Invoker(每个invoker包装着一个client)

7、创建代理

当我们进行远程调用的时,直接调用invoker代理的invoke方法,如果时Cluster,会根据配置的具体实现类进行远程调用,Dubbo默认会采用FailOver自动切换,其会使用负载均衡组件完成路由选择,执行远程调用。

请求响应流程

一个基本的请求处理示意图:

首先说下服务消费方的调用过程。

现在我们调用了一个sayHello的一个远程方法。那么首先会创建一个Proxy代理(调用ReferenceConfig.createProxy),然后调用Protocol的refer方法。

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        this.optimizeSerialization(url);
        DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
        this.invokers.add(invoker);
        return invoker;
    }

DubboInvoker负责请求的封装,发送和返回响应结果。

protected Result doInvoke(Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation)invocation;
        String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment("path", this.getUrl().getPath());
        inv.setAttachment("version", this.version);
        ExchangeClient currentClient;
        if (this.clients.length == 1) {
            currentClient = this.clients[0];
        } else {
            currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
        }

        try {
            boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
            boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
            int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
            int clientTimeout = Integer.parseInt(invocation.getAttachment("timeout", "-1"));
            if (clientTimeout > 0) {
                timeout = clientTimeout;
            }

            if (isOneway) {
                boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture((Future)null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                FutureAdapter<Object> futureAdapter = new FutureAdapter(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Object result;
                if (isAsyncFuture) {
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }

                return (Result)result;
            } else {
                RpcContext.getContext().setFuture((Future)null);
                //阻塞等待响应结果
                return (Result)currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException var13) {
            throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var13.getMessage(), var13);
        } catch (RemotingException var14) {
            throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var14.getMessage(), var14);
        }
    }

关于具体的请求响应流程,我本来想自己写了,但看到官网已经写的比较详细了,没有必要再赘述了,官网服务调用过程: Dubbo服务调用过程 。 关于Netty的详细介绍,在我之前的文章里有。

我这里就简单描述一下,首先拿到网上的一张图,比较好:

简单流程:

1、当请求来到时,由业务线程通过NettyChannel注册到IO线程中,并返回ResponseFuture对象(异步处理关键,类似Netty的ChannelFuture),用户线程会调用DefaultFuture 的 get 方法等待响应对象的到来。

2、IO主线程负责任务事件的分发,会将任务派发到IO工作线程中。

3、工作线程会将事件封装成任务,放到业务线程中执行。业务线程会先后调用DecodeHander(请求数据解码),随后调用HeaderExchangeHandler,该Handler调用后面的Handler,且封装响应,并返回响应结果。后向Handler就是DubboProtocol的匿名Handler:requestHandler。该方法会通过invoke的方式调用服务。

看一下代码:

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        HeaderExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);

        try {
            if (message instanceof Request) {
                Request request = (Request)message;
                if (request.isEvent()) {
                    this.handlerEvent(channel, request);
                } else if (request.isTwoWay()) {
                    //处理请求
                    this.handleRequest(exchangeChannel, request);
                } else {
                    this.handler.received(exchangeChannel, request.getData());
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response)message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = this.handler.telnet(channel, (String)message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                this.handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }

    }

随后会调用handleRequest

void handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object data;
        if (req.isBroken()) {
            data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable)data);
            } else {
                msg = data.toString();
            }

            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus((byte)40);
            channel.send(res);
        } else {
            data = req.getData();

            try {
               //这会调用后向Handler的reply(匿名handler:DubboProtocol.requestHandler)
                CompletableFuture<Object> future = this.handler.reply(channel, data);
                if (future.isDone()) {
                    res.setStatus((byte)20);
                    res.setResult(future.get());
                    channel.send(res);
                    return;
                }

                future.whenComplete((result, t) -> {
                    try {
                        try {
                            if (t == null) {
                                res.setStatus((byte)20);
                                res.setResult(result);
                            } else {
                                res.setStatus((byte)70);
                                res.setErrorMessage(StringUtils.toString(t));
                            }
                             //通过channel发送响应结果
                            channel.send(res);
                        } catch (RemotingException var8) {
                            logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var8);
                        }

                    } finally {
                        ;
                    }
                });
            } catch (Throwable var6) {
                res.setStatus((byte)70);
                res.setErrorMessage(StringUtils.toString(var6));
                channel.send(res);
            }

        }
    }

随后会调用RequetHandler.reply:

 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            } else {
                Invocation inv = (Invocation)message;
                Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
                boolean hasMethod;
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get("_isCallBackServiceInvoke"))) {
                    String methodsStr = (String)invoker.getUrl().getParameters().get("methods");
                    hasMethod = false;
                    if (methodsStr != null && methodsStr.contains(",")) {
                        String[] methods = methodsStr.split(",");
                        String[] var8 = methods;
                        int var9 = methods.length;

                        for(int var10 = 0; var10 < var9; ++var10) {
                            String method = var8[var10];
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    } else {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    }

                    if (!hasMethod) {
                        DubboProtocol.this.logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }

                RpcContext rpcContext = RpcContext.getContext();
                hasMethod = invoker.getUrl().getMethodParameter(inv.getMethodName(), "async", false);
                if (hasMethod) {
                    CompletableFuture<Object> future = new CompletableFuture();
                    rpcContext.setAsyncContext(new AsyncContextImpl(future));
                }

                rpcContext.setRemoteAddress(channel.getRemoteAddress());
                Result result = invoker.invoke(inv);
                return result instanceof AsyncRpcResult ? ((AsyncRpcResult)result).getResultFuture().thenApply((r) -> {
                    return r;
                }) : CompletableFuture.completedFuture(result);
            }
        }

4、业务线程池封装好结果后,会返回响应结果。

  protected Result doInvoke(Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation)invocation;
        String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment("path", this.getUrl().getPath());
        inv.setAttachment("version", this.version);
        ExchangeClient currentClient;
        if (this.clients.length == 1) {
            currentClient = this.clients[0];
        } else {
            currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
        }

        try {
            boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
            boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
            int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
            int clientTimeout = Integer.parseInt(invocation.getAttachment("timeout", "-1"));
            if (clientTimeout > 0) {
                timeout = clientTimeout;
            }

            if (isOneway) {
                boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture((Future)null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                FutureAdapter<Object> futureAdapter = new FutureAdapter(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Object result;
                if (isAsyncFuture) {
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }

                return (Result)result;
            } else {
                RpcContext.getContext().setFuture((Future)null);
                return (Result)currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException var13) {
            throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var13.getMessage(), var13);
        } catch (RemotingException var14) {
            throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var14.getMessage(), var14);
        }
    }

发送消息:

HeaderExchangeChannel#send =>
NettyChannel#send => 
NioSocketChannel#writeAndFlush(message) 

5、随后IO线程接收到消息后,会执行HeaderExchangeHandler下个方法:

 static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }

    }

6、当响应对象到来后,用户线程会被唤醒,并通过 调用编号 获取属于自己的响应对象。通过DefaultFuture.get方法阻塞地获取响应结果。

public class DefaultFuture implements ResponseFuture {  
    
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    
	public static void received(Channel channel, Response response) {
        try {
            // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                // 继续向下调用
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at ...");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

	private void doReceived(Response res) {
        lock.lock();
        try {
            // 保存响应对象
            response = res;
            if (done != null) {
                // 唤醒用户线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

从上面的代码可以看到,用户线程通过Condition接口实现线程等待和唤醒。

当收到响应消息后,会通过signal方法唤醒用户线程。唤醒后线程执行读操作。

消费者不用担心接收别的响应消息,因为所有消息都是有全局唯一标识的。

不过还要说一点,这个DubboFuture的获取过程。直接从官网copy:DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES,这是一个ConcurrentHashMap

。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。

看一下FUTURES.

public class DefaultFuture implements ResponseFuture {
   //DefaultFuture的MAP,Key是调用编号。
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap();
  
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done;
    private final long start;
    private volatile long sent;
    private volatile Response response;
    private volatile ResponseCallback callback;

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.done = this.lock.newCondition();
        this.start = System.currentTimeMillis();
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000);
        FUTURES.put(this.id, this);
        CHANNELS.put(this.id, channel);
    }

获取DefaultFuture:

 public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = (DefaultFuture)FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")).format(new Date()) + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }

    }

参考:

dubbo剖析:七 网络通信总结

Dubbo源码学习四(Consumer,Provider通信)

Dubbo服务调用过程

Dubbo源码之网络通信

Dubbo-自适应扩展机制之Adaptive注解原理

Dubbo的服务发现细节

官网介绍

Spring XML schema 扩展机制

Dubbo源吗之spring整合

--------EOF---------
本文微信分享/扫码阅读