美文网首页
Dubbo源码分析----Dispatcher和ThreadPo

Dubbo源码分析----Dispatcher和ThreadPo

作者: _六道木 | 来源:发表于2018-07-15 19:30 被阅读34次

    Dispatcher

    Dispatcher是决定事件如何派发的策略,即将哪些事件派发线程池,还是说直接在当前线程中执行。

    先看下接口的定义

    @SPI(AllDispatcher.NAME)
    public interface Dispatcher {
    
        @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 后两个参数为兼容旧配置
        ChannelHandler dispatch(ChannelHandler handler, URL url);
    
    }
    

    Dispatcher的几个实现如下

    all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
    direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
    message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
    execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
    connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
    

    根据扩展机制,会根据URL的参数(diapatcher/dispather/channel.handler)获取对应的实现,如果没有设置,那么默认使用SPI上的实现,即all的实现

    AllDispatcher

    public class AllDispatcher implements Dispatcher {
       
        public static final String NAME = "all";
        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return new AllChannelHandler(handler, url);
        }
    
    }
    
    public class AllChannelHandler extends WrappedChannelHandler {
        
        public AllChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
    
        public void connected(Channel channel) throws RemotingException {
            ExecutorService cexecutor = getExecutorService(); 
            try{
                cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
            }catch (Throwable t)  {//ERROR}
        }
        
        public void disconnected(Channel channel) throws RemotingException {
            ExecutorService cexecutor = getExecutorService(); 
            try{
                cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
            }catch (Throwable t)  {//ERROR}
        }
    
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t)  {//ERROR}
        }
    
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            ExecutorService cexecutor = getExecutorService(); 
            try{
                cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
            }catch (Throwable t) {//ERROR}
        }
    
        private ExecutorService getExecutorService() {
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) { 
                cexecutor = SHARED_EXECUTOR;
            }
            return cexecutor;
        }
    }
    

    可以看到这种情况下的事件全部交由线程池处理。
    再看下构造方法,其直接调用父类WrappedChannelHandler的进行初始化

        public WrappedChannelHandler(ChannelHandler handler, URL url) {
            this.handler = handler;
            this.url = url;
            //通过扩展机制获取对象线程池
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    
            String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
            if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
                componentKey = Constants.CONSUMER_SIDE;
            }
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
    

    ExecutionDispatcher

    /**
     * 除发送全部使用线程池处理
     * 
     * @author chao.liuc
     */
    public class ExecutionDispatcher implements Dispatcher {
        
        public static final String NAME = "execution";
    
        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return new ExecutionChannelHandler(handler, url);
        }
    
    }
    
    public class ExecutionChannelHandler extends WrappedChannelHandler {
        
        public ExecutionChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
    
        public void connected(Channel channel) throws RemotingException {
            executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
        }
    
        public void disconnected(Channel channel) throws RemotingException {
            executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
        }
    
        public void received(Channel channel, Object message) throws RemotingException {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        }
    
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
        }
    }
    

    与All类似(官网说:只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。感觉不是啊)

    DirectDispatcher

    /**
     * 不派发线程池。
     * 
     * @author chao.liuc
     */
    public class DirectDispatcher implements Dispatcher {
        
        public static final String NAME = "direct";
    
        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return handler;
        }
    }
    

    所有消息都不派发到线程池,全部在 IO 线程上直接执行。 这里只是将传入的handler返回,并没有中转到线程池处理,因为在Handler这里,用的是装饰者模式,其他的Dispatcher会将Handler包装一层,这一层是派发到线程池,如果不包装那就是走回原来的流程

    MessageOnlyDispatcher

    /**
     * 只有message receive使用线程池.
     * 
     * @author chao.liuc
     */
    public class MessageOnlyDispatcher implements Dispatcher {
    
        public static final String NAME = "message";
    
        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return new MessageOnlyChannelHandler(handler, url);
        }
    
    }
    public class MessageOnlyChannelHandler extends WrappedChannelHandler {
        
        public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
    
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) {
                cexecutor = SHARED_EXECUTOR;
            }
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
    
    }
    

    只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

    ConnectionOrderedDispatcher

    public class ConnectionOrderedDispatcher implements Dispatcher {
    
        public static final String NAME = "connection";
    
        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return new ConnectionOrderedChannelHandler(handler, url);
        }
    
    }
    public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
    
        protected final ThreadPoolExecutor connectionExecutor;
        private final int queuewarninglimit ;
        
        public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
            String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
            connectionExecutor = new ThreadPoolExecutor(1, 1,
                                         0L, TimeUnit.MILLISECONDS,
                                         new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                                         new NamedThreadFactory(threadName, true),
                                         new AbortPolicyWithReport(threadName, url)
                );  // FIXME 没有地方释放connectionExecutor!
            queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
        }
    
        public void connected(Channel channel) throws RemotingException {
           //....
                checkQueueLength();
                connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
           //....
        }
    
        public void disconnected(Channel channel) throws RemotingException {
            //....
                checkQueueLength();
                connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
            //....
        }
    
        public void received(Channel channel, Object message) throws RemotingException {
                    //....
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.RECEIVED, exception));
            //....
        }
    
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            //....
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
            //....
        }
        
        private void checkQueueLength(){
            if (connectionExecutor.getQueue().size() > queuewarninglimit){
                logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
            }
        }
    }
    

    这种类型的,除了事件执行线程池,还信初始化了一个线程池,这个线程池类内部只有一个线程,然后将连接断开和连上的事件放到了该线程池处理

    Dispatcher的内容比较简单,就大概过了一下。
    另外,平时使用的是all,这种将所有事件都放到线程池中处理,会出现一种情况,假设一个请求过来的时候,线程池满了,所以报错,但是all会将错误事件也放回线程池返回,如果这个时候线程池还是满了,那么这个错误信息将无法发送回去,导致consumer会一直等待超时

    ThreadPool

    在Dispatcher中初始化线程池的代码如下:

    ExecutorService executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
                           .getAdaptiveExtension().getExecutor(url);
    

    通过扩展机制去获取对应类型的ThreadPool,看下接口定义

    @SPI("fixed")
    public interface ThreadPool {
        
        @Adaptive({Constants.THREADPOOL_KEY})
        Executor getExecutor(URL url);
    
    }
    

    根据扩展机制,可以知道,默认使用的fixed这个实现,且@Adaptive上声明了threadpool这个key,证明可以配置threadpool这个属性来指定使用哪种实现,下面看下Dubbo内部的3个线程池实现

    /**
     * 此线程池启动时即创建固定大小的线程数,不做任何伸缩,来源于:<code>Executors.newFixedThreadPool()</code>
     * 
     * @see java.util.concurrent.Executors#newFixedThreadPool(int)
     * @author william.liangf
     */
    public class FixedThreadPool implements ThreadPool {
    
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                    queues == 0 ? new SynchronousQueue<Runnable>() : 
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                                : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    /**
     * 此线程池可伸缩,线程空闲一分钟后回收,新请求重新创建线程,来源于:<code>Executors.newCachedThreadPool()</code>
     * 
     * @see java.util.concurrent.Executors#newCachedThreadPool()
     * @author william.liangf
     */
    public class CachedThreadPool implements ThreadPool {
    
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 
                    queues == 0 ? new SynchronousQueue<Runnable>() : 
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                                : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    /**
     * 此线程池一直增长,直到上限,增长后不收缩。
     * 
     * @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
     */
    public class LimitedThreadPool implements ThreadPool {
    
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
                    queues == 0 ? new SynchronousQueue<Runnable>() : 
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                                : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    
    1. FixedThreadPool:线程池默认核心线程数和最大线程数为200,通过threads属性可以配置;队列默认为0,通过queues属性可以配置,如果队列数为0,那么使用SynchronousQueue对象,小于0,则使用无界的队列LinkedBlockingQueue,否则使用有界的LinkedBlockingQueue

    2. CachedThreadPool:比FixedThreadPool多了个corethreads的属性来配置核心线程数,以及alive属性配置keepAliveTime参数,其他类似

    3.LimitedThreadPool:这种类型keepAliveTime为Long.MAX_VALUE,即基本上不会自动减少线程数量

    相关文章

      网友评论

          本文标题:Dubbo源码分析----Dispatcher和ThreadPo

          本文链接:https://www.haomeiwen.com/subject/ykaypftx.html