美文网首页dubbo
Dubbo NettyServer 消息分发策略

Dubbo NettyServer 消息分发策略

作者: 晴天哥_王志 | 来源:发表于2019-10-19 09:50 被阅读0次

 这篇文章的目标是分析清楚Dubbo NettyServer的消息分发策略,会分析Handler的封装和调用过程,最后分析Dubbo NettyServer支持的所有分发策略。

 前半部分会讲解清楚Handler的封装流程和调用过程,具体的关系如下图。我们关注 MultiMessageHandler => HeartbeatHandler => AllChannelHandler的封装过程及对应的调用过程。

 后半部分会分析Dubbo NettyServer支持的所有分发策略及对应处理方法,这里会涉及到Dubbo业务线程池的解析。

Handler封装流程图

ChannelHandler封装过程

  • Dubbo Protocol 服务发布篇我们已经分析了NettyServer创建的过程,这里分析NettyServer创建过程中关于ChannelHandler的封装过程。
public class NettyServer extends AbstractServer implements Server {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private Map<String, Channel> channels; // <ip:port, channel>

    private ServerBootstrap bootstrap;

    private org.jboss.netty.channel.Channel channel;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}
  • NettyServer中关注ChannelHandlers.wrap()过程,这里是ChannelHandler的封装入口,这里的handler对象是DecodeHandler。
public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
  • ChannelHandlers的wrap()方法内部实现了MultiMessageHandler => HeartbeatHandler => AllChannelHandler的封装。
  • AllChannelHandler从获取通过动态代理获取方式获取。ExtensionLoader.getExtensionLoader().getAdaptiveExtension().dispatch(handler, url)
  • 关注下Dispatcher$Adaptive的实现。内部通过SPI获取AllChannelHandler对象。
public class Dispatcher$Adaptive implements Dispatcher {
    public ChannelHandler dispatch(ChannelHandler channelHandler, URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }

        URL uRL2 = uRL;
        String string = uRL2.getParameter("dispatcher",
                uRL2.getParameter("dispather",
                    uRL2.getParameter("channel.handler", "all")));

        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append(
                    "Failed to get extension (org.apache.dubbo.remoting.Dispatcher) name from url (")
                                                              .append(uRL2.toString())
                                                              .append(") use keys([dispatcher, dispather, channel.handler])")
                                                              .toString());
        }

        Dispatcher dispatcher = (Dispatcher) ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                            .getExtension(string);

        return dispatcher.dispatch(channelHandler, uRL);
    }
}


public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
  • Dispatcher$Adaptive内部获取扩展名然后获取Dispatcher对象。
  • uRL2.getParameter("dispatcher",
    uRL2.getParameter("dispather",
    uRL2.getParameter("channel.handler", "all"))过程记录原来错别字dispather,默认值是all,返回的AllDispatcher对象
  • 至此ChannelHandler的封装过程已经分析清楚。

Handler分析

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
}
  • MultiMessageHandler为AbstractChannelHandlerDelegate子类,重载received()方法。
  • 其他connected()、disconnected()等方法都继承自AbstractChannelHandlerDelegate类。
  • MultiMessageHandler类的handler对象是HeartbeatHandler,所有调用都会调用HeartbeatHandler对应的方法。
    MultiMessageHandler的handler的定义在父类AbstractChannelHandlerDelegate。
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    public static final String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";

    public static final String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";

    public HeartbeatHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        setReadTimestamp(channel);
        setWriteTimestamp(channel);
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        clearReadTimestamp(channel);
        clearWriteTimestamp(channel);
        handler.disconnected(channel);
    }
}
  • HeartbeatHandler为AbstractChannelHandlerDelegate子类,重载connected()/disconnected()等方法。
  • HeartbeatHandler的handler对象是AllChannelHandler,所有调用都会调用AllChannelHandler对应的方法。
  • HeartbeatHandler的handler的定义在父类AbstractChannelHandlerDelegate。
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
    // handler变量保存
    protected ChannelHandler handler;

    protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
        Assert.notNull(handler, "handler == null");
        this.handler = handler;
    }

    @Override
    public ChannelHandler getHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        }
        return handler;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        handler.disconnected(channel);
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        handler.received(channel, message);
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        handler.caught(channel, exception);
    }
}
  • AbstractChannelHandlerDelegate类实现ChannelHandlerDelegate接口。
  • AbstractChannelHandlerDelegate包含ChannelHandler handler对象。
  • HeartbeatHandler和MultiMessageHandler的handler是在AbstractChannelHandlerDelegate中定义的。
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() 
                                           + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() +
                 " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information 
            //can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, 
            // and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                                  ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}


public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
    protected static final ExecutorService SHARED_EXECUTOR = 
       Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
                          .getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}
  • AllChannelHandler继承WrappedChannelHandler类,重载如connected()等方法。
  • AllChannelHandler内部的executor对象在WrappedChannelHandler类中定义。
  • ExtensionLoader.getExtensionLoader().getAdaptiveExtension().getExecutor()
    生成executor对象。
  • ExtensionLoader.getExtensionLoader().getAdaptiveExtension()返回ThreadPool$Adaptive。

public class ThreadPool$Adaptive implements ThreadPool {
    public Executor getExecutor(URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("threadpool", "fixed");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().
             append("Failed to get extension (org.apache.dubbo.common.threadpool.ThreadPool) name from url (")
             .append(uRL2.toString()).append(") use keys([threadpool])").toString());
        }
        ThreadPool threadPool = (ThreadPool)ExtensionLoader.getExtensionLoader(ThreadPool.class)
                   .getExtension(string);
        return threadPool.getExecutor(uRL);
    }
}
  • ThreadPool$Adaptive的通过ExtensionLoader.getExtensionLoader().getExtension()获取ThreadPool对象。
  • 返回的ThreadPool对象是FixedThreadPool对象。
  • WrappedChannelHandler中的executor对象是FixedThreadPool对象。
  • WrappedChannelHandler中的executor对象是业务线程池,是FixedThreadPool类型。

至此,我们了解到默认的消息分发策略为AllChannelHandler,AllChannelHandler内部的线程池executor为FixedThreadPool类型。继续分析消息分发策略

消息分发策略

  • Dubbo中NettyServer的消息分发策略如下图所示,各类消息分发策略有对应说明。
  • 针对不同的分发策略,我们从源码角度进行分析下。
消息分发策略
org.apache.dubbo.remoting.Dispatcher文件

all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
name handler desc
all AllChannelHandler 将所有请求、连接等事件都分发到ThreadPool扩展点指定的线程池中,如果ThreadPool指定的线程池已关闭或不存在则使用默认的一个cached线程池
connection ConnectionOrderedChannelHandler 将所有请求、异常都分发到ThreadPool扩展点指定的线程池中。而连接等事件发送到只有一个工作线程的线程池内按调用接收顺序逐个执行
direct 不使用多线程,在当前的接收线程(IO线程)上直接处理所有事件、请求
execution ExecutionChannelHandler 只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行
message MessageOnlyChannelHandler 只将所有请求都分发到ThreadPool扩展点指定的线程池中,事件直接由接收线程(IO线程)处理

WrappedChannelHandler

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
    protected static final ExecutorService SHARED_EXECUTOR = 
       Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    // 核心的业务线程池ExecutorService对象
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
             .getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

    public ExecutorService getExecutorService() {
        return executor == null || executor.isShutdown() ? SHARED_EXECUTOR : executor;
    }
}
  • WrappedChannelHandler内部包含业务处理线程池executor。
  • 业务线程池通过适配器方法返回的FixedThreadPool对象。

AllDispatcher

  • all : (AllDispatcher类)所有消息都派发到业务线程池,这些消息包括请求/响应/连接事件/断开事件/心跳等,这些线程模型如下图:
AllDispatcher
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}
  • AllDispatcher将所有请求、事件都分发到ThreadPool扩展点指定的线程池中,如果ThreadPool指定的线程池已关闭或不存在则使用默认的一个cached线程池。
  • AllDispatcher内部返回AllChannelHandler()对象。
  • AllDispatcher作为Dubbo默认的消息分发策略。
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + 
                    " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + 
                  " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information 
            //can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and 
            //causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                                       ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}
  • AllChannelHandler重载了WrappedChannelHandler方法。
  • AllChannelHandler将所有请求、事件都分发到ThreadPool扩展点指定的线程池中,如果ThreadPool指定的线程池已关闭或不存在则使用默认的一个cached线程池。
  • AllChannelHandler处理了所有的请求、事件,包括连接等事件和具体的消息处理等。

MessageOnlyDispatcher

  • message : (MessageOnlyDispatcher类)只有请求响应消息派发到业务线程池,其他连接断开事件/心跳等消息,直接在IO线程上执行,模型图如下:
MessageOnlyDispatcher
public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }
}
  • MessageOnlyDispatcher返回的是MessageOnlyChannelHandler对象。
public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}
  • MessageOnlyDispatcher只将所有请求都分发到ThreadPool扩展点指定的线程池中,连接等事件直接由接收线程(IO线程)处理。
  • MessageOnlyDispatcher重载received()方法,用于处理所有请求。

ExecutionDispatcher

  • execution:(ExecutionDispatcher类)只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行,模型如下图:
ExecutionDispatcher
public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}
  • ExecutionDispatcher返回的是ExecutionChannelHandler对象。
/**
 * Only request message will be dispatched to thread pool. Other messages like response, connect, disconnect,
 * heartbeat will be directly executed by I/O thread.
 */
public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}
  • ExecutionChannelHandler只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行。
  • ExecutionChannelHandler重载received()方法,只处理Request类型的Message。

ConnectionOrderedDispatcher

  • connection:(ConnectionOrderedDispatcher类)在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理,模型如下图:
ConnectionOrderedDispatcher
public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }
}
  • ConnectionOrderedDispatcher返回ConnectionOrderedChannelHandler对象。
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() 
                 + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // fix, reject exception can not be sent to consumer 
            // because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                         ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " 
               + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}
  • ConnectionOrderedChannelHandler在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理。
  • ConnectionOrderedChannelHandler包含两个线程池,一个是IO线程池,一个是业务线程池。这里的IO线程池是区别于NettyServer的Worker线程池。
  • IO线程池connectionExecutor是单个执行线程的线程池实现事件的有序处理。
  • ConnectionOrderedChannelHandler重载received()/connected()等事件。

DirectDispatcher

  • direct : (DirectDispacher类)所有消息都不派发到业务线程池,全部在IO线程上直接执行,模型如下图:
DirectDispatcher
public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}
  • DirectDispatcher内部没有任何线程池对象,所有的请求事件都由NettyServer的worker线程进行处理。

参考

Dubbo 线程池策略和线程模型

相关文章

  • Dubbo NettyServer 消息分发策略

     这篇文章的目标是分析清楚Dubbo NettyServer的消息分发策略,会分析Handler的封装和调用过程,...

  • Dubbo线程池

    Dubbo的线程模型与线程池策略 Dubbo默认的底层网络通讯使用的是Netty,服务提供方NettyServer...

  • 分布式消息通信Kafka-原理分析

    本文目标 TopicPartition 消息分发策略 消息消费原理 消息的存储策略 Partition 副本机制 ...

  • 分布式消息通信Kafka(二) - 原理分析

    本文目标 Topic&Partition 消息分发策略 消息消费原理 消息的存储策略 Partition 副本机制...

  • (5)消息分发策略

    在介绍kafka的分区策略之前,先看看几个简单的概念 1.topic 在 kafka 中,topic 是一个存储消...

  • dubbo - NettyServer源码解析

    开篇  这篇文章的主要目的是记录下Dubbo当中NettyServer针对请求的处理过程,主要是理顺连接建立的过程...

  • 22.Dubbo线程模型

    Dubbo默认底层通讯为Netty,服务提供放NettyServer使用两级想成池(selector模式),其中E...

  • RabbitMQ浅读

    消息分发策略 当有多个消费者时,RabbitMQ将会轮流的将消息发送给消费者.这种分发消息的方式称为'round-...

  • 8. dubbo源码-NettyServer

    Consumer需要通过Netty(默认)把请求信息发送到远程Provider,Provider也是通过Netty...

  • NettyServer群发消息

    在netty中我们可以使用ChannelGroup方式进行群发消息,ChannelGroup继承Set接口关键代码...

网友评论

    本文标题:Dubbo NettyServer 消息分发策略

    本文链接:https://www.haomeiwen.com/subject/ipaimctx.html