Dubbo源代码实现二:服务调用的动态代理和负载均衡

张晓 2016-07-30

疑惑一:为什么在Spring中我们能像注入普通本地服务JavaBean一样注入远程的Dubbo服务Bean?

我们知道,Dubbo将服务调用封装成普通的Spring的Bean,于是我们可以像使用本地的Spring Bean一样,来调用远端的Dubbo服务,并有LoadBalance和Failover的功能。现在,我们从源码的角度来看看,Dubbo是如何做到这点的。

我们知道,要成为Dubbo服务的消费者,需要在Spring的xml文件中配置dubbo:reference节点,如下:

<dubbo:reference id="configRemoteService"

interface="com.manzhizhen.biz.ConfigRemoteService"

version="1.0.0" filter="DataVerifyDubboConsumerFilter,LogDubboConsumerFilter" />

registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); 

我们看看 DubboBeanDefinitionParser中是如何将dubbo:reference元素转换成BeanDefinition对象的,下面给出部分源码:

public class DubboBeanDefinitionParser implements BeanDefinitionParser {

    private final Class<?> beanClass;

    private final boolean required;

    public DubboBeanDefinitionParser(Class<?> beanClass, boolean required) {

        this.beanClass = beanClass;

        this.required = required;

    }

    public BeanDefinition parse(Element element, ParserContext parserContext) {

        return parse(element, parserContext, beanClass, required);

    }

   

    @SuppressWarnings("unchecked")

    private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {

        RootBeanDefinition beanDefinition = new RootBeanDefinition();

        beanDefinition.setBeanClass(beanClass);

        beanDefinition.setLazyInit(false);

        String id = element.getAttribute("id");

   /** 下面略*/

我们直接看parse操作,beanDefinition.setBeanClass(beanClass);直接将传入的beanClass作为BeanDefinition要创建的Spring Bean的类型,于是乎,我们知道了,所有的Dubbo消费者Bean都是ReferenceBean类型的对象,interface属性中配置的接口只是让ReferenceBean对象知道Dubbo的服务提供方提供的方法签名而已。但问题立马来了,既然是ReferenceBean类型的对象,但我们确实使用的不应该是interface配置的类型的实例吗?,比如我们都是这样使用这个Bean的:

@Autowired

privateConfigRemoteService configRemoteService;

它似乎就是ConfigRemoteService类型,而不是ReferenceBean类型,于是我们想到了,Dubbo肯定在某个地方创建了一个实现了该interface(比如上面的com.manzhizhen.biz.ConfigRemoteService)类型的对象,一般来说,Dubbo的消费者模块中只有服务提供者模块暴露的API,所以,需要能在Java体系中的动态创建对象,你一定立马想到了动态代理!

ReferenceConfig是ReferenceBean的父类,它存储了一些配置信息,我们看看ReferenceConfig中几个比较重要的成员变量:

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

// 接口类型

private String               interfaceName;

private Class<?>             interfaceClass;

// 接口代理类引用

private transient volatile T ref;

private transient volatile Invoker<?> invoker;

这里我们直接看到了代理工厂proxyFactory,还有表示interface配置的类型的interfaceName和interfaceClass,而ref正是用来存储代理工厂proxyFactory创建出来的代理类!不信我们可以先看看ReferenceConfig中的几个方法:

public synchronized T get() {

    if (destroyed){

        throw new IllegalStateException("Already destroyed!");

    }

   if (ref == null) {

      init();

   }

   return ref;

}

private void init() {

if (initialized) {

         return;

}

/** 此处有省略*/

     if (ProtocolUtils.isGeneric(getGeneric())) {

         interfaceClass = GenericService.class;

     } else {

         try {

               interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()

           .getContextClassLoader());

         } catch (ClassNotFoundException e) {

               throw new IllegalStateException(e.getMessage(), e);

         }

         checkInterfaceAndMethods(interfaceClass, methods);

     }

/** 此处有大量省略*/

    //attributes通过系统context进行存储.

    StaticContext.getSystemContext().putAll(attributes);

    ref = createProxy(map);

}

private T createProxy(Map<String, String> map) {

       /** 此处有大量省略*/

       Boolean c = check;

       if (c == null && consumer != null) {

           c = consumer.isCheck();

       }

       if (c == null) {

           c = true; // default true

       }

       if (c && ! invoker.isAvailable()) {

           throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());

       }

       if (logger.isInfoEnabled()) {

           logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());

       }

       // 创建服务代理

       return (T) proxyFactory.getProxy(invoker);

   }

这里先简单介绍下Dubbo中的Invoker和Invocation两个接口,Invoker定义了Dubbo服务调用者的基本操作行为,而Invocation描述调用过程中的方法信息,可理解成调用的上下文信息,我们直接看看这两个接口的定义:

public interface Invoker<T> extends Node {

    /**

     * get service interface.

     *

     * @return service interface.

     */

    Class<T> getInterface();

    /**

     * invoke.

     *

     * @param invocation

     * @return result

     * @throws RpcException

     */

    Result invoke(Invocation invocation) throws RpcException;

}

public interface Invocation {

       /** 为了缩短篇幅,常规的注释这里省略了*/

   String getMethodName();

   Class<?>[] getParameterTypes();

   Object[] getArguments();

   Map<String, String> getAttachments();

  

   String getAttachment(String key);

  

   String getAttachment(String key, String defaultValue);

   Invoker<?> getInvoker();

}

可以看到,这两个接口的定义有点类似于反射API中的相关定义,没错,调用就是调用,简单可扩展即可。我们知道可以在Dubbo中通过SPI的方式定义些拦截器(Filter),而拦截器中就可以使用Invoker#invoke来进行递归调用了。

可见ReferenceConfig#get方法直接将代理工厂创建的代理对象返回,而该方法最终会被ReferenceConfig的子类ReferenceBean的getObject方法调用:

public Object getObject() throws Exception {

    return get();

}

我们知道,在Spring容器中,会利用BeanDefinition对象信息来初始化创建之后的Bean对象,但如果你想插手“创建对象”这一步,有两种方法:1.使用工厂方法模式来指定某个工厂方法创建Bean。2. Bean的类实现FactoryBean接口,没错,Dubbo中就是用第二种方法来插手Bean对象创建这一步,所以显然ReferenceBean实现了FactoryBean接口,而getObject()正是FactoryBean接口中定义的,这也就是为什么我们可以直接用interface属性中配置的类型来直接使用该Dubbo消费者对象的原因了。这样就解释了我们上面抛出的问题。

那么,Dubbo也像Spring容器中那样默认对接口实现使用JDK代理,而对非接口实现使用CGLib方式来代理吗?我们上面又看到,代理对象是以Invoker对象为基础创建的(return (T) proxyFactory.getProxy(invoker);),在Dubbo中,定义了一个代理工厂接口ProxyFactory:

@SPI("javassist")

public interface ProxyFactory {

    @Adaptive({Constants.PROXY_KEY})

    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    @Adaptive({Constants.PROXY_KEY})

    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

抽象类AbstractProxyFactory实现了它的getProxy方法:

public abstract class AbstractProxyFactory implements ProxyFactory {

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {

        Class<?>[] interfaces = null;

        String config = invoker.getUrl().getParameter("interfaces");

        if (config != null && config.length() > 0) {

            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);

            if (types != null && types.length > 0) {

                interfaces = new Class<?>[types.length + 2];

                interfaces[0] = invoker.getInterface();

                interfaces[1] = EchoService.class;

                for (int i = 0; i < types.length; i ++) {

                    interfaces[i + 1] = ReflectUtils.forName(types[i]);

                }

            }

        }

        if (interfaces == null) {

            interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};

        }

        return getProxy(invoker, interfaces);

    }

   

    public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

}

可以看出,AbstractProxyFactory#getProxy的通用实现,其实主要是将该代理需要实现的接口给确定好,该接口存储在invoker对象的url的interfaces参数中,是一个字符串,用逗号分隔,我们不需要关心interfaces中存储的到底是哪些接口,只需要知道的是,invoker.getInterface()会加入到其中,而另一个重载的getProxy方法显然是丢给子类来实现了。AbstractProxyFactory有两个实现类:JdkProxyFactory和JavassistProxyFactory,顾名思义,一个是使用JDK的动态代理,一个是使用Javaassist来实现动态代理,我们直接看JdkProxyFactory的实现:

public class JdkProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {

        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));

    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {

        return new AbstractProxyInvoker<T>(proxy, type, url) {

            @Override

            protected Object doInvoke(T proxy, String methodName,

                                      Class<?>[] parameterTypes,

                                      Object[] arguments) throws Throwable {

                Method method = proxy.getClass().getMethod(methodName, parameterTypes);

                return method.invoke(proxy, arguments);

            }

        };

    }

}

在JdkProxyFactory的getProxy方法中我们看到了熟悉的类:InvokerInvocationHandler,很显然它实现了JDK中的InvocationHandler接口:

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler){

        this.invoker = handler;

    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        String methodName = method.getName();

        Class<?>[] parameterTypes = method.getParameterTypes();

        if (method.getDeclaringClass() == Object.class) {

            return method.invoke(invoker, args);

        }

        if ("toString".equals(methodName) && parameterTypes.length == 0) {

            return invoker.toString();

        }

        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {

            return invoker.hashCode();

        }

        if ("equals".equals(methodName) && parameterTypes.length == 1) {

            return invoker.equals(args[0]);

        }

        return invoker.invoke(new RpcInvocation(method, args)).recreate();

    }

}

也像我们预期那样,invoke操作是由invoker对象来完成的,这里的recreate是Result中的方法,只是对结果进行“再处理”,如果没异常就返回,有异常则再次抛出。

所以,看到这我们就明白了,在Dubbo中,没有使用CGLib进行代理,而是使用JDK和Javassist来进行动态代理!我们知道,动态代理是无法用反射做的,只能靠动态生成字节码,这就需要使用字节码工具包,比如asm和Javassist等,在Spring3.2.2之前版本的源码中,我们可以看到是有单独spring-asm的模块的,但在Spring3.2.2版本开始,就没有spring-asm模块了,不是不使用了,而是spring-asm已经整合到spring-core中了,可见asm在Spring中的地位(CGLib使用的就是asm),至于Dubbo为什么不使用CGLib,当我们选择动态代理实现时,无非考虑的是如下因素:

  1. 使用的难易程度。
  2. 功能,比如JDK的代理只能通过接口实现,而CGLib则没这个限制。
  3. 生成的字节码的效率,包括创建的效率和运行的效率。
  4. 生成的字节码大小,为什么说这也比较重要?这和生成的Class和其实例的大小相关。

那么,接下来的问题就是,什么时候会使用JDK的动态代理,而什么时候使用Javassist的动态代理呢?在dubbo-rpc-api的模块中,有个com.alibaba.dubbo.rpc.ProxyFactory文件,里面定义了代理的几种方式:

stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper

jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory

javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory

StubProxyFactoryWrapper是存根(stub)的代理工厂,主要用于暴露服务提供者的本地服务给远端消费者来调用。可以直接通过xml的配置来手动选择代理,比如:

<dubbo:provider proxy="jdk" />或<dubbo:consumer proxy="jdk" />

如果不配置,由于ProxyFactory接口上有@SPI("javassist")注解,所以默认是使用Javassist来实现动态代理!

具体细节可以深入源码去了解,这里不多做介绍。

疑惑二:当有多个服务提供者存在时,Dubbo是怎么做负载均衡的?

我们想知道消费者选择提供者时,是怎样选择远端的服务者的?轮询还是权重?

我们先探讨负载均衡LoadBalance,在Dubbo的源码中,它是在dubbo-cluster模块中来实现的,我们看看dubbo-cluster模块中com.alibaba.dubbo.rpc.cluster.LoadBalance文件中的内容:

random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance

roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance

leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance

consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

这下我们一看就知道Dubbo中有四种LB的方式:随机、轮询、最少活跃和一致哈希。在这之前,我们先来看看负载均衡的接口LoadBalance:

@SPI(RandomLoadBalance.NAME)

public interface LoadBalance {

   /**

    * select one invoker in list.

    *

    * @param invokers invokers.

    * @param url refer url

    * @param invocation invocation.

    * @return selected invoker.

    */

    @Adaptive("loadbalance")

   <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

接口LoadBalance 的定义说明,LoadBalance 的实现只是在一个服务提供的调用者列表(invokers)中选出一个调用者即可,默认的负载方式是随机负载均衡(@SPI(RandomLoadBalance.NAME)),我们也可以指定使用哪种负载均衡:

<dubbo:reference interface="xxx" loadbalance="roundrobin"/>  或  <dubbo:service interface="xxx" loadbalance="roundrobin" />

肯定有人会问,普通的随机负载均衡和轮询的负载均衡方式效果不是一样的吗?确实,如果是普通的随机,理论上来说和轮询的方式效果一致,甚至有时候还不如轮询的好,因为这和采取的随机数的随机程度和随机数生成开销两种因素有关,但如果在随机过程中加入权重这一属性的话,随机的优势不言而喻了,比如可以做到“预热”功能,给刚上的服务器在某段时间内分配比其他服务器更少的请求,让刚上的服务器能先“热热身”,这一点,普通轮询方式是很难做到的。没错,Dubbo默认的随机负载方式就加入了权重这一因数,权重精确到某个接口的某个方法,这也是我们所期望的,其默认“预热”时间是10分钟。

Dubbo中有个抽象类AbstractLoadBalance,它实现了获取某个接口某个方法的权重值,预热部分的计算也包含在其中,这里代码就不贴出来了,之所以Dubbo这样做,是因为其他负载形式都用到权重因子。

我们这里简单描述下各种负载均衡的实现方式:

随机负载均衡(RandomLoadBalance):先统计所有服务器上该接口方法的权重总和,然后对这个总和随机nextInt一下,看生成的随机数落到哪个段内,就调哪个服务器上的该服务。

轮询负载均衡(RoundRobinLoadBalance):如果所有服务器的该接口方法的权重一样,则直接内部的序列计数器(sequences)+1然后对服务器的数量进行取模来决定调用哪个服务器上的服务;如果服务器的该接口方法的权重不一样(就是说存在预热中的服务器),则找到其中最大的权重,然后将内部的权重计数器(weightSequences)+1并对该最大权重数取模,然后再找出权重比该取模后的值大服务器列表,最后通过内部的序列计数器(sequences)+1然后对服务器列表数量进行取模来决定调用哪个服务器上的服务。

最少活跃负载均衡(LeastActiveLoadBalance):每个接口和接口方法都对应一个RpcStatus对象,记录了他们的活跃数、失败数等等相关统计信息,此种负载均衡方式是在活跃数最低的服务器中对其权重的总和取模来看结果是在哪个权重段中,则选择该服务器来调用,活跃数就像并发量降级中的计数器一样,开始调用时活跃数+1,调用结束时活跃数-1,所以活跃值越大,表明该提供者服务器的该接口方法耗时越长,而消费能力强的提供者接口往往活跃值很低。最少活跃负载均衡保证了“慢”提供者能接收到更少的服务器调用。

一致哈希负载均衡(ConsistentHashLoadBalance):一致性哈希算法的负载均衡保证了同样的请求(参数)将会落到同一台服务器上,这在某些场景是非常有用的,Dubbo中默认采用了160个虚拟节点,因为Dubbo的请求URL中除了我们使用的参数,还有些额外的系统调用参数,比如timestamp、loadbalance、pid和application等,有人可定会问,Dubbo会对URL中哪些参数进行hash,Dubbo默认除了对我们接口所有参数进行hash外,还会加上这些额外参数,因为有timestamp,这是不是也意味着在Dubbo的重试调用时timestamp不变?

上述的四种负载均衡,除了一致性哈希,其他三种都依赖了接口方法的权重统计,借助权重的不同,随机负载均衡就能做到动态调整的效果,Dubbo中的轮询负载方式,也利用了权重,那么有人会问,Dubbo中的随机和轮询是不是差别不大?是的,笔者也这样认为,相似的效果,也有着相似的缺点,Dubbo中的随机和轮询负载都没有考虑到提供者服务器消费服务的能力,如果相差很大,“慢”提供者有可能被“快”提供给者给拖垮,其根本原因也是这两种负载均衡的加权因子考虑的不是服务耗时。最少活跃的负载均衡就很巧妙的解决了此问题,而且它不是直接通过统计服务调用的耗时,而是采用统计调用差(活跃数)。一致性哈希特别适用于有缓存的系统,这样缓存命中率会比较高。

一句话总结本文,虽然动态代理和负载均衡没有直接的关系,但因为看源码看到这里了,所以写篇文章“纪念”一下,带着问题看源码往往就是这种效果。可以看出,Dubbo的设计者,在很多细节上都下足了功夫,从动态代理的选型和负载的多种实现方式就可以看出来,好的软件框架,不仅需要有好的架构设计,技术细节实现起来也不能马虎。本来想写更多,但有人和我说过,技术博文不要太长… …

相关推荐