美文网首页
SOFABolt 源码分析13 - Connection 事件处

SOFABolt 源码分析13 - Connection 事件处

作者: 原水寒 | 来源:发表于2018-10-15 17:13 被阅读86次
    image.png

    Connection 事件处理相关类

    • ConnectionEventType:定义了三种 Connection 相关事件
    • ConnectionEventHandler:Connection 事件处理器,处理两类事件
    • Netty 定义的事件:例如 connect,channelActive 等
    • SOFABolt 定义的事件:事件类型 ConnectionEventType
    • RpcConnectionEventHandler:ConnectionEventHandler 实现类,重写了其 channelInactive 方法
    • ConnectionEventListener:Connection 事件监听器,存储处理对应 ConnectionEventType 的 ConnectionEventProcessor 列表
    • ConnectionEventProcessor:真正的 Connection 事件处理器接口

    基本原理

    1. 继承 ConnectionEventProcessor,编写自定义的事件处理类
    2. 将自定义的事件处理类添加到 ConnectionEventListener 中
    3. 当触发 ConnectionEventType 相关事件时,ConnectionEventHandler 通知监听器 ConnectionEventListener,ConnectionEventListener 取出 ConnectionEventType 的自定义事件处理器列表,执行其 onEvent 方法

    一、使用姿势

    事件处理器

    =========================== 连接处理器 ===========================
    public class MyCONNECTEventProcessor implements ConnectionEventProcessor {
        @Override
        public void onEvent(String remoteAddr, Connection conn) {
            System.out.println("hello, " + remoteAddr);
        }
    }
    
    =========================== 断开处理器 ===========================
    public class MyCLOSEEventProcessor implements ConnectionEventProcessor {
        @Override
        public void onEvent(String remoteAddr, Connection conn) {
            System.out.println("bye, " + remoteAddr);
        }
    }
    

    服务端

    RpcServer server = new RpcServer(8888);
    server.registerUserProcessor(new MyServerUserProcessor());
    server.addConnectionEventProcessor(ConnectionEventType.CONNECT, new MyCONNECTEventProcessor());
    server.addConnectionEventProcessor(ConnectionEventType.CLOSE, new MyCLOSEEventProcessor());
    server.start();
    

    客户端

    RpcClient client = new RpcClient();
    client.addConnectionEventProcessor(ConnectionEventType.CONNECT, new MyCONNECTEventProcessor());
    client.addConnectionEventProcessor(ConnectionEventType.CLOSE, new MyCLOSEEventProcessor());
    client.init();
    

    二、源码分析

    2.1 服务端

    public class RpcServer extends AbstractRemotingServer implements RemotingServer {
        ...
        /** connection event handler */
        private ConnectionEventHandler connectionEventHandler;
        /** connection event listener */
        private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
        /** connection manager */
        private DefaultConnectionManager connectionManager;
    
        protected void doInit() {
            ...
            // 服务端打开了 连接管理器 开关
            if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                // 创建 ConnectionEventHandler 处理器
                this.connectionEventHandler = new RpcConnectionEventHandler(switches());
                // 创建 连接管理器
                this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
                // 设置 connectionManager 到 ConnectionEventHandler 中
                this.connectionEventHandler.setConnectionManager(this.connectionManager);
                // 设置 connectionEventListener 到 ConnectionEventHandler 中
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            } else {
                // 创建 ConnectionEventHandler 处理器
                this.connectionEventHandler = new ConnectionEventHandler(switches());
                // 设置 connectionEventListener 到 ConnectionEventHandler 中
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            }
            ...
            
            this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) {
                    ...
                    // 添加 connectionEventHandler 到 netty 的 pipeline
                    pipeline.addLast("connectionEventHandler", connectionEventHandler);
                    ...
                    createConnection(channel);
                }
    
                private void createConnection(SocketChannel channel) {
                    Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                    if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                        connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                    } else {
                        new Connection(channel, url);
                    }
                    // 发布 ConnectionEventType.CONNECT 事件
                    channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
                }
            });
        }
    
        public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) {
            this.connectionEventListener.addConnectionEventProcessor(type, processor);
        }
    }
    

    2.2 客户端

    public class RpcClient extends AbstractConfigurableInstance {
        /** connection event handler */
        private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());
        /** reconnect manager */
        private ReconnectManager reconnectManager;
        /** connection event listener */
        private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
        /** connection manager */
        private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory, connectionEventHandler, connectionEventListener, switches());
    
        public void init() {
            ...
            this.connectionManager.init();
            ...
            // 重连开关
            if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
                // 创建 ReconnectManager
                reconnectManager = new ReconnectManager(connectionManager);
                // 设置 ReconnectManager 到 connectionEventHandler 中,当 channelInactive 时,进行重连操作
                connectionEventHandler.setReconnectManager(reconnectManager);
            }
        }
    
        public void addConnectionEventProcessor(ConnectionEventType type,
                                                ConnectionEventProcessor processor) {
            this.connectionEventListener.addConnectionEventProcessor(type, processor);
        }
    }
    
    ======================== DefaultConnectionManager ==========================
        public void init() {
            // 将当前的 DefaultConnectionManager 设置到 connectionEventHandler 中,用于 channelInactive 时,从 DefaultConnectionManager 中移除指定 Connection
            this.connectionEventHandler.setConnectionManager(this);
            // 将 connectionEventListener 设置到 connectionEventHandler 中
            this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
            this.connectionFactory.init(connectionEventHandler);
        }
    
    ======================== AbstractConnectionFactory ==========================
        public void init(final ConnectionEventHandler connectionEventHandler) {
            ...
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) {
                    ...
                    pipeline.addLast("connectionEventHandler", connectionEventHandler);
                    ...
                }
            });
        }
    

    不论是服务端还是客户端,其实本质都在做一件事情:创建 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中。
    之后当有 ConnectionEvent 触发时(无论是 Netty 定义的事件被触发,还是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会通过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。具体源码如下:

    2.3 事件处理机制核心部分

    ======================== ConnectionEventListener ==========================
    /**
     * Listen and dispatch connection events.
     */
    public class ConnectionEventListener {
        private ConcurrentHashMap<ConnectionEventType, List<ConnectionEventProcessor>> processors = new ConcurrentHashMap<ConnectionEventType, List<ConnectionEventProcessor>>(3);
    
        /**
         * Dispatch events.
         */
        public void onEvent(ConnectionEventType type, String remoteAddr, Connection conn) {
            List<ConnectionEventProcessor> processorList = this.processors.get(type);
            if (processorList != null) {
                for (ConnectionEventProcessor processor : processorList) {
                    processor.onEvent(remoteAddr, conn);
                }
            }
        }
    
        /**
         * Add event processor.
         */
        public void addConnectionEventProcessor(ConnectionEventType type,
                                                ConnectionEventProcessor processor) {
            List<ConnectionEventProcessor> processorList = this.processors.get(type);
            if (processorList == null) {
                this.processors.putIfAbsent(type, new ArrayList<ConnectionEventProcessor>(1));
                processorList = this.processors.get(type);
            }
            processorList.add(processor);
        }
    }
    
    ======================== ConnectionEventProcessor ==========================
    /**
     * Process connection events.
     */
    public interface ConnectionEventProcessor {
        /**
         * Process event.
         */
        public void onEvent(String remoteAddr, Connection conn);
    }
    
    ======================== ConnectionEventHandler ==========================
    /**
     * Log the channel status event.
     */
    @Sharable
    public class ConnectionEventHandler extends ChannelDuplexHandler {
        private ConnectionManager       connectionManager;
        private ConnectionEventListener eventListener;
        private ConnectionEventExecutor eventExecutor;
        private ReconnectManager        reconnectManager;
        private GlobalSwitch            globalSwitch;
    
        @Override
        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
                            SocketAddress localAddress, ChannelPromise promise) throws Exception {
            if (logger.isInfoEnabled()) {
                ...
            }
            super.connect(ctx, remoteAddress, localAddress, promise);
        }
        ...
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ...
            super.channelInactive(ctx);
            Attribute attr = ctx.channel().attr(Connection.CONNECTION);
            
            if (null != attr) {
                // 进行重连操作,这也是 ConnectionEventHandler 持有 reconnectManager 引用的原因
                if (this.globalSwitch != null
                    && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
                    Connection conn = (Connection) attr.get();
                    if (reconnectManager != null) {
                        reconnectManager.addReconnectTask(conn.getUrl());
                    }
                }
                // 调用 ConnectionEventType.CLOSE 事件
                onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
            }
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
            if (event instanceof ConnectionEventType) {
                switch ((ConnectionEventType) event) {
                    case CONNECT:
                        Channel channel = ctx.channel();
                        if (null != channel) {
                            Connection connection = channel.attr(Connection.CONNECTION).get();
                            // 调用 ConnectionEventType.CONNECT 事件
                            this.onEvent(connection, connection.getUrl().getOriginUrl(), ConnectionEventType.CONNECT);
                        } 
                        break;
                    default:
                        return;
                }
            } else {
                super.userEventTriggered(ctx, event);
            }
        }
    
        private void onEvent(Connection conn, String remoteAddress, ConnectionEventType type) {
            if (this.eventListener != null) {
                // 1. 创建任务:该任务执行调用 ConnectionEventListener 的 onEvent
                // 2. 使用 ConnectionEventExecutor 执行该任务
                this.eventExecutor.onEvent(new Runnable() {
                    @Override
                    public void run() {
                        ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);
                    }
                });
            }
        }
    
        public void setConnectionEventListener(ConnectionEventListener listener) {
            if (listener != null) {
                // 设置 ConnectionEventListener
                this.eventListener = listener;
                // 创建 ConnectionEventExecutor,事件的异步执行器
                if (this.eventExecutor == null) {
                    this.eventExecutor = new ConnectionEventExecutor();
                }
            }
        }
    
        public class ConnectionEventExecutor {
            ExecutorService executor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), new NamedThreadFactory("Bolt-conn-event-executor", true));
    
            public void onEvent(Runnable event) {
                executor.execute(event);
            }
        }
    }
    
    ======================== RpcConnectionEventHandler ==========================
    public class RpcConnectionEventHandler extends ConnectionEventHandler {
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
            if (conn != null) {
                // 这就是 ConnectionEventHandler 持有 ConnectionManager 引用的原因
                this.getConnectionManager().remove(conn);
            }
            super.channelInactive(ctx);
        }
    }
    

    事件的处理流程

    事件触发 -> RpcConnectionEventHandler -> [ ConnectionEventListener -> ConnectionEventProcessor ]
    方括号内的操作由 ConnectionEventExecutor 异步执行
    事件的触发有两种:Netty定义的事件(例如 channelInactive)和 SOFABolt 定义的事件,前者直接在 Netty 定义的事件触发方法中进行(例如 channelInactive),后者在 userEventTriggered 方法中进行触发。

    事件的触发时机

    • ConnectionEventType.CONNECT
    • AbstractConnectionFactory # createConnection(客户端)
    • RpcServer # doInit # childHandler # initChannel # createConnection(服务端)
    • ConnectionEventType.CLOSE
    • ConnectionEventHandler # channelInactive
    ======================== 客户端创建连接 ==========================
    @Override
        public Connection createConnection(Url url) throws Exception {
            Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
            Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
                url.getVersion(), url);
            // 发布 ConnectionEventType.CONNECT 事件
            channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
            return conn;
        }
    

    相关文章

      网友评论

          本文标题:SOFABolt 源码分析13 - Connection 事件处

          本文链接:https://www.haomeiwen.com/subject/jcptzftx.html