github:https://github.com/sofastack/sofa-bolt
1. SOFABolt到底是啥?
SOFABolt 是蚂蚁金融服务集团开发的一套基于 Netty 实现的网络通信框架。
- 为了让 Java 程序员能将更多的精力放在基于网络通信的业务逻辑实现上,而不是过多的纠结于网络底层 NIO 的实现以及处理难以调试的网络问题,Netty 应运而生。
- 为了让中间件开发者能将更多的精力放在产品功能特性实现上,而不是重复地一遍遍制造通信框架的轮子,SOFABolt 应运而生。
Bolt 名字取自迪士尼动画-闪电狗,是一个基于 Netty 最佳实践的轻量、易用、高性能、易扩展的通信框架。 目前该产品已经运用在了蚂蚁中间件的微服务 (SOFARPC)、消息中心、分布式事务、分布式开关、以及配置中心等众多产品上。
2. 功能介绍
image.pngSOFABolt 的基础功能包括:
- 基础通信功能 ( remoting-core )
- 基于 Netty 高效的网络 IO 与线程模型运用
- 连接管理 (无锁建连,定时断链,自动重连)
- 基础通信模型 ( oneway,sync,future,callback )
- 超时控制
- 批量解包与批量提交处理器
- 心跳与 IDLE 事件处理
- 协议框架 ( protocol-skeleton )
- 命令与命令处理器
- 编解码处理器
- 心跳触发器
- 私有协议定制实现 - RPC 通信协议 ( protocol-implementation )
- RPC 通信协议的设计
- 灵活的反序列化时机控制
- 请求处理超时 FailFast 机制
- 用户请求处理器 ( UserProcessor )
- 双工通信
将 SOFABolt 用作一个远程通信框架,使用者可以不用关心如何实现一个私有协议的细节,直接使用我们内置的 RPC 通信协议。可以非常简单的启动客户端与服务端,同时注册一个用户请求处理器,即可完成远程调用。同时,像连接管理、心跳等基础功能特性都默认可以使用。 当前支持的调用类型如下图所示
image.png
使用手册
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>${version}</version>
</dependency>
基础功能
1.1实现用户请求处理器 (UserProcessor)
我们提供了两种用户请求处理器,SyncUserProcessor 与 AsyncUserProcessor。 二者的区别在于,前者需要在当前处理线程以return返回值的形式返回处理结果;而后者,有一个 AsyncContext 存根,可以在当前线程,也可以在异步线程,调用 sendResponse 方法返回处理结果。示例可参考如下两个类:
1.2 实现连接事件处理器 (ConnectionEventProcessor)
我们提供了两种事件监听,建连事件(ConnectionEventType.CONNECT)与断连事件(ConnectionEventType.CLOSE),用户可以创建自己的事件处理器,并注册到客户端或者服务端。客户端与服务端,都可以监听到各自的建连与断连事件。
1.3客户端与服务端初始化 (RpcClient,RpcServer)
我们提供了一个 RpcClient 与 RpcServer,经过简单的必要功能初始化,或者功能开关,即可使用。一个最简单的例子如下:
public class RpcClientDemoByMain {
static Logger logger = LoggerFactory
.getLogger(RpcClientDemoByMain.class);
static RpcClient client;
static String addr = "127.0.0.1:8999";
SimpleClientUserProcessor clientUserProcessor = new SimpleClientUserProcessor();
CONNECTEventProcessor clientConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor clientDisConnectProcessor = new DISCONNECTEventProcessor();
public RpcClientDemoByMain() {
// 1. create a rpc client
client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.init();
}
public static void main(String[] args) {
new RpcClientDemoByMain();
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
client.shutdown();
}
}
public class RpcServerDemoByMain {
static Logger logger = LoggerFactory
.getLogger(RpcServerDemoByMain.class);
BoltServer server;
int port = 8999;
SimpleServerUserProcessor serverUserProcessor = new SimpleServerUserProcessor();
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor serverDisConnectProcessor = new DISCONNECTEventProcessor();
public RpcServerDemoByMain() {
// 1. create a Rpc server with port assigned
server = new BoltServer(port);
// 2. add processor for connect and close event if you need
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
// 3. register user processor for client request
server.registerUserProcessor(serverUserProcessor);
// 4. server start
if (server.start()) {
System.out.println("server start ok!");
} else {
System.out.println("server start failed!");
}
// server.getRpcServer().stop();
}
public static void main(String[] args) {
new RpcServerDemoByMain();
}
}
1.4 基础通信模型
我们提供了四种通信模型:
- Oneway 调用
- 当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。注意 Oneway 调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务场景需要能接受这样的异常场景,才可以使用。
- 示例
- Sync 同步调用
- 当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用。如果超时时间内没有得到结果,那么会抛出超时异常。这种调用模式最常用。注意要根据对端的处理能力,合理设置超时时间。
- 示例
- Future调用
- 当前线程发起调用,得到一个 RpcResponseFuture 对象,当前线程可以继续执行下一次调用。可以在任意时刻,使用 RpcResponseFuture 对象的 get() 方法来获取结果,如果响应已经回来,此时就马上得到结果;如果响应没有回来,则会阻塞住当前线程,直到响应回来,或者超时时间到。
- 示例
- Callback异步调用
- 当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册一个回调,该回调需要分配一个异步线程池。待响应回来后,会在回调的异步线程池,来执行回调逻辑。
- 示例
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.remoting.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.InvokeCallback;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.RpcResponseFuture;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.PortScan;
import com.alipay.remoting.rpc.common.RequestBody;
import com.alipay.remoting.rpc.common.SimpleClientUserProcessor;
import com.alipay.remoting.rpc.common.SimpleServerUserProcessor;
import com.alipay.remoting.util.RemotingUtil;
/**
* basic usage demo
*
* basic usage of rpc client and rpc server
*
* @author xiaomin.cxm
* @version $Id: BasicUsageDemo.java, v 0.1 Apr 6, 2016 8:58:36 PM xiaomin.cxm Exp $
*/
public class BasicUsageDemoByJunit {
static Logger logger = LoggerFactory
.getLogger(BasicUsageDemoByJunit.class);
BoltServer server;
RpcClient client;
int port = PortScan.select();
String ip = "127.0.0.1";
String addr = "127.0.0.1:" + port;
int invokeTimes = 5;
SimpleServerUserProcessor serverUserProcessor = new SimpleServerUserProcessor();
SimpleClientUserProcessor clientUserProcessor = new SimpleClientUserProcessor();
CONNECTEventProcessor clientConnectProcessor = new CONNECTEventProcessor();
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor clientDisConnectProcessor = new DISCONNECTEventProcessor();
DISCONNECTEventProcessor serverDisConnectProcessor = new DISCONNECTEventProcessor();
@Before
public void init() {
server = new BoltServer(port, true);
server.start();
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
server.registerUserProcessor(serverUserProcessor);
client = new RpcClient();
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
client.registerUserProcessor(clientUserProcessor);
client.init();
}
@After
public void stop() {
try {
server.stop();
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error("Stop server failed!", e);
}
}
/**
* <pre>
* 当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。
* 注意 Oneway 调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者
* 定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务
* 场景需要能接受这样的异常场景,才可以使用
* </pre>
* @throws InterruptedException
*/
@Test
public void testOneway() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world oneway");
for (int i = 0; i < invokeTimes; i++) {
try {
client.oneway(addr, req);
Thread.sleep(100);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
/**
* 同步调用
* <pre>
* 当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用
* 。如果超时时间内没有得到结果,那么会抛出超时异常。这种调用模式最常用。注意
* 要根据对端的处理能力,合理设置超时时间。
* </pre>
* @throws InterruptedException
*/
@Test
public void testSync() throws InterruptedException {
RequestBody req = new RequestBody(1, "hello world sync");
for (int i = 0; i < invokeTimes; i++) {
try {
String res = (String) client.invokeSync(addr, req, 3000);
logger.warn("Result received in sync: " + res);
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in sync!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in sync!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
/**
* Future调用
* <pre>
* 当前线程发起调用,得到一个 RpcResponseFuture 对象,当前线程可以继续执行下一
* 次调用。可以在任意时刻,使用 RpcResponseFuture 对象的 get() 方法来获取结果,
* 如果响应已经回来,此时就马上得到结果;如果响应没有回来,则会阻塞住当前线程,直
* 到响应回来,或者超时时间到
* </pre>
* @throws InterruptedException
*/
@Test
public void testFuture() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world future");
for (int i = 0; i < invokeTimes; i++) {
try {
RpcResponseFuture future = client.invokeWithFuture(addr, req, 3000);
String res = (String) future.get();
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in future!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in future!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
/**
* Callback异步调用
* <pre>
* 当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册
* 一个回调,该回调需要分配一个异步线程池。待响应回来后,会在回调的异步线程池,来执
* 行回调逻辑
* </pre>
* @throws InterruptedException
*/
@Test
public void testCallback() throws InterruptedException {
RequestBody req = new RequestBody(1, "hello world callback");
final List<String> rets = new ArrayList<String>(1);
for (int i = 0; i < invokeTimes; i++) {
final CountDownLatch latch = new CountDownLatch(1);
try {
client.invokeWithCallback(addr, req, new InvokeCallback() {
Executor executor = Executors.newCachedThreadPool();
@Override
public void onResponse(Object result) {
logger.warn("Result received in callback: " + result);
rets.add((String) result);
latch.countDown();
}
@Override
public void onException(Throwable e) {
logger.error("Process exception in callback.", e);
latch.countDown();
}
@Override
public Executor getExecutor() {
return executor;
}
}, 1000);
} catch (RemotingException e) {
latch.countDown();
String errMsg = "RemotingException caught in callback!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
try {
latch.await();
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in callback!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
if (rets.size() == 0) {
Assert.fail("No result! Maybe exception caught!");
}
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, rets.get(0));
rets.clear();
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
@Test
public void testServerSyncUsingConnection1() throws Exception {
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(addr, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
Assert.assertNotNull(serverConnectProcessor.getConnection());
Connection serverConn = serverConnectProcessor.getConnection();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
/**
* 示例1:使用 Connection 对象的双工通信
* <pre>
* 注意使用 Connection 对象的双工通信,服务端需要通过事件监听处理器或者用户请求处理器,自己保存好 Connection 对象
* </pre>
* @throws Exception
*/
@Test
public void testServerSyncUsingConnection() throws Exception {
Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(clientConn, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
Assert.assertNotNull(serverConnectProcessor.getConnection());
Connection serverConn = serverConnectProcessor.getConnection();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
/**
* 示例2:使用 Address 的双工通信
* <pre>
* 注意使用 Address 方式的双工通信,需要在初始化 RpcServer 时,打开 manageConnection 开关,表示服务端会根据客户端发起的建连,维护一份地址与连接的映射关系。默认不需要双工通信的时候,这个功能是关闭的
* </pre>
* @throws Exception
*/
@Test
public void testServerSyncUsingAddress() throws Exception {
Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
String remote = RemotingUtil.parseRemoteAddress(clientConn.getChannel());
String local = RemotingUtil.parseLocalAddress(clientConn.getChannel());
logger.warn("Client say local:" + local);
logger.warn("Client say remote:" + remote);
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(clientConn, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
Assert.assertNotNull(serverConnectProcessor.getConnection());
// only when client invoked, the remote address can be get by UserProcessor
// otherwise, please use ConnectionEventProcessor
String remoteAddr = serverUserProcessor.getRemoteAddr();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(remoteAddr, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
}
1.6双工通信
除了服务端可以注册用户请求处理器,我们的客户端也可以注册用户请求处理器。此时,服务端就可以发起对客户端的调用,也可以使用 1.4 提到了任何一种通信模型。
- 示例1:使用 Connection 对象的双工通信,注意使用 Connection 对象的双工通信,服务端需要通过事件监听处理器或者用户请求处理器,自己保存好 Connection 对象。
- 示例2:使用 Address 的双工通信,注意使用 Address 方式的双工通信,需要在初始化 RpcServer 时,打开 manageConnection 开关,表示服务端会根据客户端发起的建连,维护一份地址与连接的映射关系。默认不需要双工通信的时候,这个功能是关闭的。
网友评论