美文网首页
SOFABolt入门使用

SOFABolt入门使用

作者: jackcooper | 来源:发表于2019-08-08 15:27 被阅读0次

    github:https://github.com/sofastack/sofa-bolt


    1. SOFABolt到底是啥?

    SOFABolt 是蚂蚁金融服务集团开发的一套基于 Netty 实现的网络通信框架。

    • 为了让 Java 程序员能将更多的精力放在基于网络通信的业务逻辑实现上,而不是过多的纠结于网络底层 NIO 的实现以及处理难以调试的网络问题,Netty 应运而生。
    • 为了让中间件开发者能将更多的精力放在产品功能特性实现上,而不是重复地一遍遍制造通信框架的轮子,SOFABolt 应运而生。

    Bolt 名字取自迪士尼动画-闪电狗,是一个基于 Netty 最佳实践的轻量、易用、高性能、易扩展的通信框架。 目前该产品已经运用在了蚂蚁中间件的微服务 (SOFARPC)、消息中心、分布式事务、分布式开关、以及配置中心等众多产品上。

    2. 功能介绍

    image.png

    SOFABolt 的基础功能包括:

    • 基础通信功能 ( remoting-core )
      • 基于 Netty 高效的网络 IO 与线程模型运用
      • 连接管理 (无锁建连,定时断链,自动重连)
      • 基础通信模型 ( oneway,sync,future,callback )
      • 超时控制
      • 批量解包与批量提交处理器
      • 心跳与 IDLE 事件处理
    • 协议框架 ( protocol-skeleton )
      • 命令与命令处理器
      • 编解码处理器
      • 心跳触发器
    • 私有协议定制实现 - RPC 通信协议 ( protocol-implementation )
      • RPC 通信协议的设计
      • 灵活的反序列化时机控制
      • 请求处理超时 FailFast 机制
      • 用户请求处理器 ( UserProcessor )
      • 双工通信

    将 SOFABolt 用作一个远程通信框架,使用者可以不用关心如何实现一个私有协议的细节,直接使用我们内置的 RPC 通信协议。可以非常简单的启动客户端与服务端,同时注册一个用户请求处理器,即可完成远程调用。同时,像连接管理、心跳等基础功能特性都默认可以使用。 当前支持的调用类型如下图所示


    image.png

    使用手册

    <dependency>
      <groupId>com.alipay.sofa</groupId>
      <artifactId>bolt</artifactId>
      <version>${version}</version>
    </dependency>
    

    基础功能

    1.1实现用户请求处理器 (UserProcessor)

    我们提供了两种用户请求处理器,SyncUserProcessor 与 AsyncUserProcessor。 二者的区别在于,前者需要在当前处理线程以return返回值的形式返回处理结果;而后者,有一个 AsyncContext 存根,可以在当前线程,也可以在异步线程,调用 sendResponse 方法返回处理结果。示例可参考如下两个类:

    1.2 实现连接事件处理器 (ConnectionEventProcessor)

    我们提供了两种事件监听,建连事件(ConnectionEventType.CONNECT)与断连事件(ConnectionEventType.CLOSE),用户可以创建自己的事件处理器,并注册到客户端或者服务端。客户端与服务端,都可以监听到各自的建连与断连事件。

    1.3客户端与服务端初始化 (RpcClient,RpcServer)

    我们提供了一个 RpcClient 与 RpcServer,经过简单的必要功能初始化,或者功能开关,即可使用。一个最简单的例子如下:

    public class RpcClientDemoByMain {
        static Logger             logger                    = LoggerFactory
                                                                .getLogger(RpcClientDemoByMain.class);
    
        static RpcClient          client;
    
        static String             addr                      = "127.0.0.1:8999";
    
        SimpleClientUserProcessor clientUserProcessor       = new SimpleClientUserProcessor();
        CONNECTEventProcessor     clientConnectProcessor    = new CONNECTEventProcessor();
        DISCONNECTEventProcessor  clientDisConnectProcessor = new DISCONNECTEventProcessor();
    
        public RpcClientDemoByMain() {
            // 1. create a rpc client
            client = new RpcClient();
            // 2. add processor for connect and close event if you need
            client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
            client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
            // 3. do init
            client.init();
        }
    
        public static void main(String[] args) {
            new RpcClientDemoByMain();
            RequestBody req = new RequestBody(2, "hello world sync");
            try {
                String res = (String) client.invokeSync(addr, req, 3000);
                System.out.println("invoke sync result = [" + res + "]");
            } catch (RemotingException e) {
                String errMsg = "RemotingException caught in oneway!";
                logger.error(errMsg, e);
                Assert.fail(errMsg);
            } catch (InterruptedException e) {
                logger.error("interrupted!");
            }
            client.shutdown();
        }
    }
    
    public class RpcServerDemoByMain {
        static Logger             logger                    = LoggerFactory
                                                                .getLogger(RpcServerDemoByMain.class);
    
        BoltServer                server;
    
        int                       port                      = 8999;
    
        SimpleServerUserProcessor serverUserProcessor       = new SimpleServerUserProcessor();
        CONNECTEventProcessor     serverConnectProcessor    = new CONNECTEventProcessor();
        DISCONNECTEventProcessor  serverDisConnectProcessor = new DISCONNECTEventProcessor();
    
        public RpcServerDemoByMain() {
            // 1. create a Rpc server with port assigned
            server = new BoltServer(port);
            // 2. add processor for connect and close event if you need
            server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
            server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
            // 3. register user processor for client request
            server.registerUserProcessor(serverUserProcessor);
            // 4. server start
            if (server.start()) {
                System.out.println("server start ok!");
            } else {
                System.out.println("server start failed!");
            }
            // server.getRpcServer().stop();
        }
    
        public static void main(String[] args) {
            new RpcServerDemoByMain();
        }
    }
    
    

    1.4 基础通信模型

    我们提供了四种通信模型:

    • Oneway 调用
      • 当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。注意 Oneway 调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务场景需要能接受这样的异常场景,才可以使用。
      • 示例
    • Sync 同步调用
      • 当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用。如果超时时间内没有得到结果,那么会抛出超时异常。这种调用模式最常用。注意要根据对端的处理能力,合理设置超时时间。
      • 示例
    • Future调用
      • 当前线程发起调用,得到一个 RpcResponseFuture 对象,当前线程可以继续执行下一次调用。可以在任意时刻,使用 RpcResponseFuture 对象的 get() 方法来获取结果,如果响应已经回来,此时就马上得到结果;如果响应没有回来,则会阻塞住当前线程,直到响应回来,或者超时时间到。
      • 示例
    • Callback异步调用
      • 当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册一个回调,该回调需要分配一个异步线程池。待响应回来后,会在回调的异步线程池,来执行回调逻辑。
      • 示例
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.alipay.remoting.demo;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    import org.junit.After;
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alipay.remoting.Connection;
    import com.alipay.remoting.ConnectionEventType;
    import com.alipay.remoting.InvokeCallback;
    import com.alipay.remoting.exception.RemotingException;
    import com.alipay.remoting.rpc.RpcClient;
    import com.alipay.remoting.rpc.RpcResponseFuture;
    import com.alipay.remoting.rpc.common.BoltServer;
    import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
    import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
    import com.alipay.remoting.rpc.common.PortScan;
    import com.alipay.remoting.rpc.common.RequestBody;
    import com.alipay.remoting.rpc.common.SimpleClientUserProcessor;
    import com.alipay.remoting.rpc.common.SimpleServerUserProcessor;
    import com.alipay.remoting.util.RemotingUtil;
    
    /**
     * basic usage demo
     *
     * basic usage of rpc client and rpc server
     *
     * @author xiaomin.cxm
     * @version $Id: BasicUsageDemo.java, v 0.1 Apr 6, 2016 8:58:36 PM xiaomin.cxm Exp $
     */
    public class BasicUsageDemoByJunit {
        static Logger             logger                    = LoggerFactory
                                                                .getLogger(BasicUsageDemoByJunit.class);
    
        BoltServer                server;
        RpcClient                 client;
    
        int                       port                      = PortScan.select();
        String                    ip                        = "127.0.0.1";
        String                    addr                      = "127.0.0.1:" + port;
    
        int                       invokeTimes               = 5;
    
        SimpleServerUserProcessor serverUserProcessor       = new SimpleServerUserProcessor();
        SimpleClientUserProcessor clientUserProcessor       = new SimpleClientUserProcessor();
        CONNECTEventProcessor     clientConnectProcessor    = new CONNECTEventProcessor();
        CONNECTEventProcessor     serverConnectProcessor    = new CONNECTEventProcessor();
        DISCONNECTEventProcessor  clientDisConnectProcessor = new DISCONNECTEventProcessor();
        DISCONNECTEventProcessor  serverDisConnectProcessor = new DISCONNECTEventProcessor();
    
        @Before
        public void init() {
            server = new BoltServer(port, true);
            server.start();
            server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
            server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
            server.registerUserProcessor(serverUserProcessor);
    
            client = new RpcClient();
            client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
            client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
            client.registerUserProcessor(clientUserProcessor);
            client.init();
        }
    
        @After
        public void stop() {
            try {
                server.stop();
                Thread.sleep(100);
            } catch (InterruptedException e) {
                logger.error("Stop server failed!", e);
            }
        }
    
        /**
         * <pre>
         *     当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。
         *     注意 Oneway 调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者
         *     定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务
         *     场景需要能接受这样的异常场景,才可以使用
         * </pre>
         * @throws InterruptedException
         */
        @Test
        public void testOneway() throws InterruptedException {
            RequestBody req = new RequestBody(2, "hello world oneway");
            for (int i = 0; i < invokeTimes; i++) {
                try {
                    client.oneway(addr, req);
                    Thread.sleep(100);
                } catch (RemotingException e) {
                    String errMsg = "RemotingException caught in oneway!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                }
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    
        /**
         * 同步调用
         * <pre>
         *     当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用
         *     。如果超时时间内没有得到结果,那么会抛出超时异常。这种调用模式最常用。注意
         *     要根据对端的处理能力,合理设置超时时间。
         * </pre>
         * @throws InterruptedException
         */
        @Test
        public void testSync() throws InterruptedException {
            RequestBody req = new RequestBody(1, "hello world sync");
            for (int i = 0; i < invokeTimes; i++) {
                try {
                    String res = (String) client.invokeSync(addr, req, 3000);
                    logger.warn("Result received in sync: " + res);
                    Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
                } catch (RemotingException e) {
                    String errMsg = "RemotingException caught in sync!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                } catch (InterruptedException e) {
                    String errMsg = "InterruptedException caught in sync!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                }
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    
        /**
         * Future调用
         * <pre>
         *     当前线程发起调用,得到一个 RpcResponseFuture 对象,当前线程可以继续执行下一
         *     次调用。可以在任意时刻,使用 RpcResponseFuture 对象的 get() 方法来获取结果,
         *     如果响应已经回来,此时就马上得到结果;如果响应没有回来,则会阻塞住当前线程,直
         *     到响应回来,或者超时时间到
         * </pre>
         * @throws InterruptedException
         */
        @Test
        public void testFuture() throws InterruptedException {
            RequestBody req = new RequestBody(2, "hello world future");
            for (int i = 0; i < invokeTimes; i++) {
                try {
                    RpcResponseFuture future = client.invokeWithFuture(addr, req, 3000);
                    String res = (String) future.get();
                    Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
                } catch (RemotingException e) {
                    String errMsg = "RemotingException caught in future!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                } catch (InterruptedException e) {
                    String errMsg = "InterruptedException caught in future!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                }
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    
        /**
         * Callback异步调用
         * <pre>
         *     当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册
         *     一个回调,该回调需要分配一个异步线程池。待响应回来后,会在回调的异步线程池,来执
         *     行回调逻辑
         * </pre>
         * @throws InterruptedException
         */
        @Test
        public void testCallback() throws InterruptedException {
            RequestBody req = new RequestBody(1, "hello world callback");
            final List<String> rets = new ArrayList<String>(1);
            for (int i = 0; i < invokeTimes; i++) {
                final CountDownLatch latch = new CountDownLatch(1);
                try {
                    client.invokeWithCallback(addr, req, new InvokeCallback() {
                        Executor executor = Executors.newCachedThreadPool();
    
                        @Override
                        public void onResponse(Object result) {
                            logger.warn("Result received in callback: " + result);
                            rets.add((String) result);
                            latch.countDown();
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            logger.error("Process exception in callback.", e);
                            latch.countDown();
                        }
    
                        @Override
                        public Executor getExecutor() {
                            return executor;
                        }
    
                    }, 1000);
    
                } catch (RemotingException e) {
                    latch.countDown();
                    String errMsg = "RemotingException caught in callback!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                }
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    String errMsg = "InterruptedException caught in callback!";
                    logger.error(errMsg, e);
                    Assert.fail(errMsg);
                }
                if (rets.size() == 0) {
                    Assert.fail("No result! Maybe exception caught!");
                }
                Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, rets.get(0));
                rets.clear();
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    
        @Test
        public void testServerSyncUsingConnection1() throws Exception {
            for (int i = 0; i < invokeTimes; i++) {
                RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
                String serverres = (String) client.invokeSync(addr, req1, 1000);
                Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
    
                Assert.assertNotNull(serverConnectProcessor.getConnection());
                Connection serverConn = serverConnectProcessor.getConnection();
                RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
                String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
                Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    
        /**
         * 示例1:使用 Connection 对象的双工通信
         * <pre>
         *     注意使用 Connection 对象的双工通信,服务端需要通过事件监听处理器或者用户请求处理器,自己保存好 Connection 对象
         * </pre>
         * @throws Exception
         */
        @Test
        public void testServerSyncUsingConnection() throws Exception {
            Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
    
            for (int i = 0; i < invokeTimes; i++) {
                RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
                String serverres = (String) client.invokeSync(clientConn, req1, 1000);
                Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
    
                Assert.assertNotNull(serverConnectProcessor.getConnection());
                Connection serverConn = serverConnectProcessor.getConnection();
                RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
                String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
                Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    
        /**
         * 示例2:使用 Address 的双工通信
         * <pre>
         *     注意使用 Address 方式的双工通信,需要在初始化 RpcServer 时,打开 manageConnection 开关,表示服务端会根据客户端发起的建连,维护一份地址与连接的映射关系。默认不需要双工通信的时候,这个功能是关闭的
         * </pre>
         * @throws Exception
         */
        @Test
        public void testServerSyncUsingAddress() throws Exception {
            Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
            String remote = RemotingUtil.parseRemoteAddress(clientConn.getChannel());
            String local = RemotingUtil.parseLocalAddress(clientConn.getChannel());
            logger.warn("Client say local:" + local);
            logger.warn("Client say remote:" + remote);
    
            for (int i = 0; i < invokeTimes; i++) {
                RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
                String serverres = (String) client.invokeSync(clientConn, req1, 1000);
                Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
    
                Assert.assertNotNull(serverConnectProcessor.getConnection());
                // only when client invoked, the remote address can be get by UserProcessor
                // otherwise, please use ConnectionEventProcessor
                String remoteAddr = serverUserProcessor.getRemoteAddr();
                RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
                String clientres = (String) server.getRpcServer().invokeSync(remoteAddr, req, 1000);
                Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
            }
    
            Assert.assertTrue(serverConnectProcessor.isConnected());
            Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
            Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
        }
    }
    
    

    1.6双工通信

    除了服务端可以注册用户请求处理器,我们的客户端也可以注册用户请求处理器。此时,服务端就可以发起对客户端的调用,也可以使用 1.4 提到了任何一种通信模型。

    • 示例1:使用 Connection 对象的双工通信,注意使用 Connection 对象的双工通信,服务端需要通过事件监听处理器或者用户请求处理器,自己保存好 Connection 对象。
    • 示例2:使用 Address 的双工通信,注意使用 Address 方式的双工通信,需要在初始化 RpcServer 时,打开 manageConnection 开关,表示服务端会根据客户端发起的建连,维护一份地址与连接的映射关系。默认不需要双工通信的时候,这个功能是关闭的。

    相关文章

      网友评论

          本文标题:SOFABolt入门使用

          本文链接:https://www.haomeiwen.com/subject/wqyxjctx.html