Dubbo线程模型

作者: 此鱼不得水 | 来源:发表于2017-12-22 16:12 被阅读994次

线程是每个应用都必须关系的事情,毕竟任何服务器的资源都是有限的,服务线程过少的容易发生阻塞,服务线程过多的话上下文切换的开销又会影响效率,所以合适的线程模型对于一个高性能的应用来说必不可少。Dubbo作为一个带有服务治理功能的RPC框架,在线程模型上也有自己的处理,今天就让我们一起来看一下Dubbo的线程模型。

下面我们要看一下默认情况下的线程模型:
首先明确一个基本概念:IO线程和业务线程的区别

  • IO线程:配置在netty连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据打交道的事件。
  • 业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider上写的代码所执行的线程环境。

Dubbo默认采用的是长链接的方式,即默认情况下一个consumer和一个provider之间只会建立一条链接,这种情况下IO线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;二业务线程就是处理IO线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要比IO线程池的配置大很多。

Dubbo中线程相关参数的含义

  • iothreads:指定IO线程池(worker)的线程数量,默认情况下为CPU个数+1,因为这个线程的工作内容比较简单,所以一般情况下我们不会去配置这个值,除非IO线程的响应速度明显拖慢了整个工程的响应,IO线程的默认类型是CacheThreadPool,一分钟的线程死亡时间。

  • threadpool:业务线程的具体线程类型,默认采用的fixed线程池,即线程数量一定的线程池,这种线程池的好处就是不会频繁创建线程线程,适合线业务比较密集的应用。因为这个数据只管关系到服务的并发情况,所以在需要的时候可以适当调整该数量来增加工程的并发。

  • threads:该参数就是业务线程池的核心线程数配置,默认情况下为200。如果空间有条件的话可以适当地提升该数量,例如提升至400或者500都是可以的。

  • queues:该数量指定来在初始化业务线程池时候是否需要排队队列,如果不设置的话,业务线程池的排队队列是SynchronousQueue,即不允许业务事件排队,如果线程池没有空闲线程之后会直接排除异常信息。但是如果配置来queues之后则会使用LinkedBlockingQueue作为排队队列,queues则代表队列的初始队列。因为queues的配置直接关系到排队,所以在一般情况下建议不要配置,因为线程池满的情况下一般期望是直接失败,然后调用其他的机器,而不是再次队列继续等待,继续等待不仅可能会拉低响应时间,而且很有可能会超时。

  • acceptes:我们知道threadpool,threads和queues都是控制业务线程池的字段,而acceptes就是控制IO线程池的字段。这个字段标示着服务端可接受的最大长连接数,默认情况下为不限制,但是有时候为来保护服务器防止连接数过多导致请求失败率过高,则可以考虑设置该字段为一个定值。

  • connections:既然服务端可以设置最大接收的连接数,那么客户端也可以设置与服务端建立的连接数。connections可以配置在reference上表示要同对应的服务器建立的长链接数量,默认为只建立一条链接,如果配置来connections的话则会建立N条长链接以提供消费者的吞吐量。但是有一点需要注意是如果conenctions的数量配置大于服务端的accepts的话,超出的部分会直接报错,表示不支持更多的链接,该值不宜配置过多,因为如果多个消费者都配置来该值的话很容易到值服务端的accepts超过预期数量而报错。

  • dispatcher:这个字段代表的是IO线程池和业务线程池的边界,具体有这么几种类型,下面我们一一详细看看:

    • all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。对应的是AllChannelHandler(具体这个Handler的处理位置以及他的作用,见前几篇博客,这里不再强调)
    public class AllChannelHandler extends WrappedChannelHandler {
      
      public AllChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
      }
      
      //链接事件通过线程池处理
      public void connected(Channel channel) throws RemotingException {
          ExecutorService cexecutor = getExecutorService(); 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
          }
      }
      //链接断开事件通过线程池处理
      public void disconnected(Channel channel) throws RemotingException {
          ExecutorService cexecutor = getExecutorService(); 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t);
          }
      }
      //数据接收事件通过线程池处理
      public void received(Channel channel, Object message) throws RemotingException {
          ExecutorService cexecutor = getExecutorService();
          try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
          } catch (Throwable t) {
              throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
          }
      }
      //异常事件通过线程池梳理
      public void caught(Channel channel, Throwable exception) throws RemotingException {
          ExecutorService cexecutor = getExecutorService(); 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
          }catch (Throwable t) {
              throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
          }
      }
    
      private ExecutorService getExecutorService() {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) { 
              cexecutor = SHARED_EXECUTOR;
          }
          return cexecutor;
      }
      }
    
    • direct:所有消息都不派发到线程池,全部在IO线程上直接执行。(这种做法在绝大多数情况下都不合理,因为毕竟业务逻辑相关对IO事件都是复杂的)。具体的实现方式就是在装饰者的层级上直接下调,不再包装线程池。
    • message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行。
      /**
       * 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行
       */
      public class MessageOnlyChannelHandler extends WrappedChannelHandler {
      
      public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
      }
      //接收到消息时候触发,无论是服务端接收到请求数据还是客户端接收到返回数据
      public void received(Channel channel, Object message) throws RemotingException {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) {
              cexecutor = SHARED_EXECUTOR;
          }
          try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
          } catch (Throwable t) {
              throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
          }
      }
    
      }
    

    具体的做法其实很简单,相对于all来说只不过只会会将received事件在线程池中处理,其他的一概以默认方式处理(IO线程池)。

    • execution:官方的说法是:只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在IO线程上执行。相对于message来说,限制的更死了,也就是只有服务端的业务逻辑才会执行在业务线程池中执行。消费端如果收到的消息之后,处理逻辑还是IO线程上执行。但是实际情况是我看到的代码显示execution与all的处理逻辑几乎一样,并没有体现出官方的说法。 具体实现如下:
      public class ExecutionChannelHandler extends WrappedChannelHandler {
      
      public ExecutionChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
      }
      //处理链接建立事件
      public void connected(Channel channel) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
      }
      //处理链接断开事件
      public void disconnected(Channel channel) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
      }
      //处理数据收到的事件
      public void received(Channel channel, Object message) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
      }
      //处理异常事件
      public void caught(Channel channel, Throwable exception) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
      }
      }
      
    

    有可能是我的理解还不到位,对于其其他的用处没有理解到,如果这里有问题的话还请大家指出。

    • connection:在IO线程上,将连接建立以及断开事件放入队列,有序逐个执行,其它消息派发到线程池。具体实现如下:
      public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
    
      protected final ThreadPoolExecutor connectionExecutor;
      private final int queuewarninglimit ;
      
      public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
          String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
          //初始化一个单独处理链接建立和断开的无界队列连接池
          connectionExecutor = new ThreadPoolExecutor(1, 1,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                                       new NamedThreadFactory(threadName, true),
                                       new AbortPolicyWithReport(threadName, url)
              );  
          //预警排队数量
          queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
      }
      
      //连接建立事件直接在单独的线程池中处理
      public void connected(Channel channel) throws RemotingException {
          try{
              checkQueueLength();
              connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
          }
      }
      //连接断开事件直接在单独的线程池中处理
      public void disconnected(Channel channel) throws RemotingException {
          try{
              checkQueueLength();
              connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("disconnected event", channel, getClass()+" error when process disconnected event ." , t);
          }
      }
      //数据接收事件还是在业务线程池中处理
      public void received(Channel channel, Object message) throws RemotingException {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) {
              cexecutor = SHARED_EXECUTOR;
          }
          try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
          } catch (Throwable t) {
              throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
          }
      }
      //异常事件还是在业务线程池中处理
      public void caught(Channel channel, Throwable exception) throws RemotingException {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) { 
              cexecutor = SHARED_EXECUTOR;
          } 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
          }catch (Throwable t) {
              throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
          }
      }
      //检查排队数量是否大于预警数量(默认为1000),如果炒过的话就打WARNING日志
      private void checkQueueLength(){
          if (connectionExecutor.getQueue().size() > queuewarninglimit){
              logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
          }
      }
      }
      
    

    这种dispatcher的意义就在于将将连接事件与IO线程池和业务线程池分开处理,是其不会相互干扰。假如在网络不稳定的环境下,不会因为频繁的网络抖动影响实际的业务处理效率。


关于dubbo线程模型的内容应该都已经讲完了,具体怎么配置还要根据实际的业务场景。

相关文章

  • DUBBO线程模型和调度策略

    DUBBO线程模型 从官方描述来看dubbo线程模型支持业务线程和I/O线程分离,并且提供5种不同的调度策略。 拿...

  • dubbo线程模型

    这里写dubbo线程模型,主要是讲服务消费者和服务提供者这两大核心的线程模型。 一 dubbo服务提供者线程模型。...

  • Ali Dubbo

    遗留问题:线程模型?? Dubbo配置的例子: dubbo/dubbo-samples JSR 303: Bean...

  • Dubbo线程模型2

    转自 https://ifeve.com/dubbo-threadmodel/ Dubbo剖析-线程模型 一、前言...

  • Dubbo之线程模型

    以netty传输层实现为例 Dubbo的线程模型其实就是netty的线程模型,主从Refactor模型。 主Ref...

  • Dubbo线程池

    Dubbo的线程模型与线程池策略 Dubbo默认的底层网络通讯使用的是Netty,服务提供方NettyServer...

  • dubbo线程模型

    https://www.processon.com/mindmap/5b62750ce4b08d36229ac023

  • Dubbo线程模型

    线程是每个应用都必须关系的事情,毕竟任何服务器的资源都是有限的,服务线程过少的容易发生阻塞,服务线程过多的话上下文...

  • Dubbo 消息派发的线程模型

    今天我们要介绍的是Dubbo消息派发的时候,使用的线程模型,Dubbo版本2.8.4。那么什么是Dubbo的消息派...

  • dubbo 线程模型浅析

      dubbo线程模型分为IO线程和服务调用处理线程,IO线程主要是netty的线程,可以在protocol标签中...

网友评论

    本文标题:Dubbo线程模型

    本文链接:https://www.haomeiwen.com/subject/euhxgxtx.html