美文网首页sofabolt
SOFABolt 源码分析15 - 双工通信机制的设计

SOFABolt 源码分析15 - 双工通信机制的设计

作者: 原水寒 | 来源:发表于2018-10-18 08:00 被阅读115次
    image.png

    SOFABolt 提供了双工通信能力,使得不仅客户端可以调用服务端,服务端也可以主动调用客户端(当然,客户端也就需要可以注册 UserProcessor 的功能)。

    SOFABolt 四种调用模式:oneway / sync / future / callback
    SOFABolt 三种调用链路:addr / url / connection,
    注意:在整个调用过程中,调用链路会发生如下转化:addr -> url -> connection

    如上图所示,整个调用辅助类包含:

    • BaseRemoting:提供了四种基本调用模式的方法(基于 Connection 的,值得注意的是,Connection 也是三种调用链路最底层的),这四种调用模式也提供了基本的调用模板;
    • RpcRemoting
    • 实现了 RpcProtocol 的初始化
    • 实现了基于 addr 链路的四种基本调用模式模板(内部实际调用基于 url 链路模式)
    • 提供了将请求对象 Object 封装为 RemotingCommand 的方法
    • RpcClientRemoting
    • 实现了基于 url 链路的四种基本调用模式模板(内部实际调用基于 connection 链路模式,根据 url 调用建连接口创建 connection)
    • 提供了建连操作
    • RpcServerRemoting

    实现了基于 url 链路的四种基本调用模式模板(内部实际调用基于 connection 链路模式,但是不会根据 url 创建 connection,只会从连接管理器根据 url 获取连接 - 所以要想使用基于 url 链路的功能,必须开启服务端连接管理功能,而基于 addr 链路的方式底层又是转化为 url 链路方式,所以基于 addr 链路的功能,也必须开启服务端连接管理功能

    注意:

    • 客户端调用服务端会主动建连,服务端调用客户端不会主动建连
    • 服务端想使用基于 url 链路或者基于 addr 链路的调用功能,必须开启服务端连接管理功能(实际上还需要保存 addr 链路地址,通常通过 UserProcessor.handleRequest 中的 BizContext 来获取 remoteAddr 并存储在 UserProcessor 中)
    • 服务端如果没有开启服务端连接管理功能,只能通过 connection 链路进行调用,此时要保存好连接建立好的时候创建的 connection 对象(通常使用 ConnectionEventType.CONNECT 的连接事件处理器做这件事)

    一、使用姿势

    1.1、基于 addr 链路模式

    服务端

    public class MyServer {
        public static void main(String[] args) throws RemotingException, InterruptedException {
            RpcServer server= new RpcServer(8888);
            MyServerUserProcessor serverUserProcessor = new MyServerUserProcessor();
            server.registerUserProcessor(serverUserProcessor);
            // 打开服务端连接管理功能
            server.switches().turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);
    
            if (server.start()) {
                System.out.println("server start success!");
                // 模拟去其他事情
                Thread.sleep(10000);
                MyRequest request = new MyRequest();
                request.setReq("hi, bolt-client");
                // 向 serverUserProcessor 存储的 RemoteAddr 发起请求
                MyResponse resp = (MyResponse)server.invokeSync(serverUserProcessor.getRemoteAddr(), request, 10000);
                System.out.println(resp.getResp());
            } else {
                System.out.println("server start fail!");
            }
        }
    }
    
    =========================== 服务端业务逻辑处理器 ===========================
    public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
        // 存储 client 端地址,用于发起远程调用
        private String remoteAddr;
    
        @Override
        public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
            remoteAddr = bizCtx.getRemoteAddress(); // 此处也可以存储 Connection:bizCtx.getConnection();
            MyResponse response = new MyResponse();
            if (request != null) {
                System.out.println(request);
                response.setResp("from server -> " + request.getReq());
            }
    
            return response;
        }
    
        @Override
        public String interest() {
            return MyRequest.class.getName();
        }
    
        public String getRemoteAddr() {
            return remoteAddr;
        }
    }
    

    客户端

    public class MyClient {
        private static RpcClient client;
        private static CountDownLatch latch = new CountDownLatch(1);
    
        public static void start() {
            client = new RpcClient();
            // 注册业务逻辑处理器
            client.registerUserProcessor(new MyClientUserProcessor());
            client.init();
        }
    
        public static void main(String[] args) throws RemotingException, InterruptedException {
            MyClient.start();
            MyRequest request = new MyRequest();
            request.setReq("hello, bolt-server");
            MyResponse response = (MyResponse) client.invokeSync("127.0.0.1:8888", request, 300 * 1000);
            System.out.println(response);
            latch.await();
        }
    }
    
    =========================== 客户端业务逻辑处理器 ===========================
    public class MyClientUserProcessor extends SyncUserProcessor<MyRequest> {
        @Override
        public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
            MyResponse response = new MyResponse();
            if (request != null) {
                System.out.println(request);
                response.setResp("from client -> " + request.getReq());
            }
            return response;
        }
    
        @Override
        public String interest() {
            return MyRequest.class.getName();
        }
    }
    

    1.2、基于 connection 链路模式

    服务端

    public class MyServer {
        public static void main(String[] args) throws RemotingException, InterruptedException {
            RpcServer server= new RpcServer(8888);
            MyServerUserProcessor serverUserProcessor = new MyServerUserProcessor();
            server.registerUserProcessor(serverUserProcessor);
            // 创建并注册 ConnectionEventType.CONNECT 连接事件处理器
            MyCONNECTEventProcessor connectEventProcessor = new MyCONNECTEventProcessor();
            server.addConnectionEventProcessor(ConnectionEventType.CONNECT, connectEventProcessor);
            if (server.start()) {
                System.out.println("server start success!");
                // 模拟去其他事情
                Thread.sleep(10000);
                MyRequest request = new MyRequest();
                request.setReq("hi, bolt-client");
                // 向 connectEventProcessor 存储的 connection 发起请求
                MyResponse resp = (MyResponse)server.invokeSync(connectEventProcessor.getConnection(), request, 10000);
                System.out.println(resp.getResp());
            } else {
                System.out.println("server start fail!");
            }
        }
    }
    
    =========================== 连接事件处理器 ===========================
    public class MyCONNECTEventProcessor implements ConnectionEventProcessor {
        // 存储连接,用于服务端向客户端发起远程通信
        private Connection connection;
    
        @Override
        public void onEvent(String remoteAddr, Connection conn) {
            this.connection = conn;
            System.out.println("hello, " + remoteAddr);
        }
    
        public Connection getConnection() {
            return connection;
        }
    }
    

    客户端

    同 1.1

    二、源码解析

    建连

    ======================================== RpcServer ========================================
        protected void doInit() {
            // 开启服务端连接管理功能
            if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                this.connectionEventHandler = new RpcConnectionEventHandler(switches());
                // 与客户端一样,创建连接管理器,并设置到 connectionEventHandler 中
                this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
                this.connectionEventHandler.setConnectionManager(this.connectionManager);
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            } else {
                this.connectionEventHandler = new ConnectionEventHandler(switches());
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            }
            initRpcRemoting();
            ...
            this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) {
                    ...
                    createConnection(channel);
                }
    
                private void createConnection(SocketChannel channel) {
                    Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                    // 如果开启了连接管理功能,则新建 Connection 并加入 连接管理器;
                    // 如果没有开启,则直接新建 Connection
                    if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                        connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                    } else {
                        new Connection(channel, url);
                    }
                    // 发布建连事件,此时进行连接的保存
                    channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
                }
            });
        }
    
    ======================================== DefaultConnectionManager ========================================
        public void add(Connection connection, String poolKey) {
            ConnectionPool pool = null;
            // 如果有 ConnectionPool 直接获取,没有就创建一个新的
            // 与客户端不同的是,该 add 方法只创建一个空的 ConnectionPool,不会初始化连接;而客户端会初始化连接
            // 连接是否初始化,取决于 ConnectionPoolCall 构造器参数
            pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
            // 连接加入连接池
            pool.add(connection);
        }
    
        private class ConnectionPoolCall implements Callable<ConnectionPool> {
            // 是否初始化连接
            private boolean whetherInitConnection;
            private Url     url;
    
            // 默认构造器:只创建 pool,不建连(服务端用的这个)
            public ConnectionPoolCall() {
                this.whetherInitConnection = false;
            }
    
            // 创建 pool + 建连(客户端用的这个)
            public ConnectionPoolCall(Url url) {
                this.whetherInitConnection = true;
                this.url = url;
            }
    
            @Override
            public ConnectionPool call() throws Exception {
                // 创建 pool
                final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
                if (whetherInitConnection) {
                    // 建连
                    doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
                }
                return pool;
            }
        }
    

    调用

    ======================================== RpcRemoting ========================================
        public Object invokeSync(String addr, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // addr => url
            Url url = this.addressParser.parse(addr);
            // 以 url 链路方式进行调用
            return this.invokeSync(url, request, invokeContext, timeoutMillis);
        }
    
    ======================================== RpcServerRemoting ========================================
        public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // 直接从 connectionManager 获取连接(仅仅是获取,不建连)
            Connection conn = this.connectionManager.get(url.getUniqueKey());
            // 检查连接
            this.connectionManager.check(conn);
            // 以 connection 链路方式进行调用
            return this.invokeSync(conn, request, invokeContext, timeoutMillis);
        }
    
        public Connection DefaultConnectionManager#get(String poolKey) {
            // 获取连接池,如果没有返回null,否则从连接池获取一个 connection。整个过程只是单纯的获取,不做建连操作
            ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
            return null == pool ? null : pool.get();
        }
    ======================================== RpcClientRemoting ========================================
        public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // 获取连接,如果没有就建连
            final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
            // 检查连接
            this.connectionManager.check(conn);
            // 以 connection 链路方式进行调用
            return this.invokeSync(conn, request, invokeContext, timeoutMillis);
        }
    
    ======================================== RpcRemoting ========================================
        public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // 构造请求统一体
            RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
            // 预处理 invokeContext
            preProcessInvokeContext(invokeContext, requestCommand, conn);
            // 发起调用,返回响应统一体
            ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
            // 设置 invokeContext 到响应统一体
            responseCommand.setInvokeContext(invokeContext);
            // 从响应统一体解析出真正的响应消息
            Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
            return responseObject;
        }
    
    ======================================== BaseRemoting ========================================
        protected RemotingCommand invokeSync(Connection conn, RemotingCommand request, int timeoutMillis) {
            // 创建 InvokeFuture
            final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
            // 将 InvokeFuture 添加到 Connection 的 InvokeFutureMap 映射中
            conn.addInvokeFuture(future);
            // 发起 netty 请求
            conn.getChannel().writeAndFlush(request);
            // 阻塞等待响应结果
            RemotingCommand response = future.waitResponse(timeoutMillis);
            // 被唤醒后返回响应结果
            return response;
        }
    

    相关文章

      网友评论

        本文标题:SOFABolt 源码分析15 - 双工通信机制的设计

        本文链接:https://www.haomeiwen.com/subject/vqaxaftx.html