美文网首页分布式dubbo
Dubbo 线程池策略和线程模型

Dubbo 线程池策略和线程模型

作者: 晴天哥_王志 | 来源:发表于2019-05-27 10:03 被阅读395次

开篇

 这篇文章的目的主要是分析下Dubbo当中关于线程池的策略和线程模型,主要从源码角度出发并结合网上一些现成的文章来进行阐述。

 个人阅读过源码以后的第一感觉就是成熟的框架也是靠使用了基础的线程池来实现的,唯一的遗憾就是这篇文章没有理顺dubbo的配置和真正线程池参数之间的关系,这个后面再补充一篇类似的文章。

Dubbo线程池策略

resources目录下的com.alibaba.dubbo.common.threadpool.ThreadPool的文件

fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool

说明:

  • dubbo的线程池策略通过SPI配置文件对外提供,在com.alibaba.dubbo.common.threadpool.ThreadPool文件当中定义。
  • dubbo的线程池策略对外提供了三种策略,分别是fixed、cached、limited三类。
  • 每类策略的定义见下述源码。

CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

说明:

  • CachedThreadPool的实现可以理解为JDK当中Executors.newCachedThreadPool()方法。

FixedThreadPool

public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

说明:

  • FixedThreadPool的实现可以理解为JDK当中Executors.newFixedThreadPool()方法。

LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

说明:

  • LimitedThreadPool的实现可以理解为JDK当中Executors.newCachedThreadPool()方法。
  • LimitedThreadPool的区别在于线程池中的线程永远不会过期,因为alive时间为最大值。

NamedThreadFactory

public class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    private final String mPrefix;

    private final boolean mDaemon;

    private final ThreadGroup mGroup;

    public NamedThreadFactory() {
        this("pool-" + POOL_SEQ.getAndIncrement(), false);
    }

    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }

    public NamedThreadFactory(String prefix, boolean daemon) {
        mPrefix = prefix + "-thread-";
        mDaemon = daemon;
        SecurityManager s = System.getSecurityManager();
        mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }

    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }

    public ThreadGroup getThreadGroup() {
        return mGroup;
    }
}

说明

  • 提供了线程池中线程创建的工厂,推荐使用线程池时候一定要自定义线程池工厂,便于定位线程的用途。

Dubbo线程模型

resources目录下的com.alibaba.dubbo.remoting.Dispatcher文件

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

说明:

  • Dubbo线程模型在resources目录下的com.alibaba.dubbo.remoting.Dispatcher文件当中。
  • 提供5类线程模型,分别是all、direct、message、execution、connection。
  • 每类的用途见下面的介绍。

Dubbo线程模型对比

根据请求的消息类被IO线程处理还是被业务线程池处理,Dubbo提供了下面几种线程模型:

  • all : (AllDispatcher类)所有消息都派发到业务线程池,这些消息包括请求/响应/连接事件/断开事件/心跳等,这些线程模型如下图:
AllDispatcher
  • direct : (DirectDispacher类)所有消息都不派发到业务线程池,全部在IO线程上直接执行,模型如下图:
DirectDispacher
  • message : (MessageOnlyDispatcher类)只有请求响应消息派发到业务线程池,其他连接断开事件/心跳等消息,直接在IO线程上执行,模型图如下:
MessageOnlyDispatcher
  • execution:(ExecutionDispatcher类)只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行,模型如下图:
execution
  • connection:(ConnectionOrderedDispatcher类)在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理,模型如下图:
ConnectionOrderedDispatcher

AllDispatcher

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {}
    }

    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {}
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {}
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

说明:

  • AllChannelHandler的connected、disconnected、received统一通过业务线程池处理。
  • cexecutor.execute(new ChannelEventRunnable())统一提交。

ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }
}


public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {}
    }

    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {}
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {}
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}

说明:

  • ConnectionOrderedDispatcher的connected和disconnected事件通过connectionExecutor实现。
  • ConnectionOrderedDispatcher的received事件通过单独的executor去实现。
  • ConnectionOrderedDispatcher的连接处理和消息处理通过不同的executor处理。

DirectDispatcher

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}

说明:

  • DirectDispatcher内部的处理没有用到线程池,统一由IO线程去处理。

ExecutionDispatcher

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }
}

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    }

    public void disconnected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    }

    public void received(Channel channel, Object message) throws RemotingException {
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    }

}

说明:

  • ExecutionDispatcher的实现和AllDispatcher几乎相同。
  • ExecutionDispatcher的连接和消息处理统一由业务线程池处理。

MessageOnlyDispatcher

public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }
}


public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

}

说明:

  • MessageOnlyDispatcher的消息处理通过业务线程池去执行。
  • MessageOnlyDispatcher的连接事件通过IO线程去执行。

参考文章

Dubbo学习笔记8:Dubbo的线程模型与线程池策略

相关文章

  • DUBBO线程模型和调度策略

    DUBBO线程模型 从官方描述来看dubbo线程模型支持业务线程和I/O线程分离,并且提供5种不同的调度策略。 拿...

  • Dubbo线程池

    Dubbo的线程模型与线程池策略 Dubbo默认的底层网络通讯使用的是Netty,服务提供方NettyServer...

  • Dubbo 线程池策略和线程模型

    开篇  这篇文章的目的主要是分析下Dubbo当中关于线程池的策略和线程模型,主要从源码角度出发并结合网上一些现成的...

  • 信号量用法:锁

    用于dubbo线程池满的拒绝策略

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • Dubbo线程模式和多协议与Netty线程模式详解分析

    概述 Dubbo线程模型 IO线程组:负责IO流形式监听客户端的所有行为(连接、断开、发送读、写请求) 业务线程池...

  • Dubbo线程模型和多协议

    概述 Dubbo线程模型 IO线程组:负责IO流形式监听客户端的所有行为(连接、断开、发送读、写请求) 业务线程池...

  • 线程池概述

    为什么要使用线程池? 线程池核心参数 线程池的几种拒绝策略 execute()和submit()的区别 线程池工作...

  • Dubbo学习笔记之SPI

    Dispatcher dubbo的Dispatcher策略: all 所有消息都派发到线程池,包括请求,响应,连接...

  • 3. Dubbo线程池模型

    dubbo有两种线程池,io线程池、业务线程池以netty作为io框架为例:boss线程池: 主要处理新的连接请求...

网友评论

    本文标题:Dubbo 线程池策略和线程模型

    本文链接:https://www.haomeiwen.com/subject/lpcctctx.html