前言:接上篇,看完了注册中心,该看看RPC框架了——《分布式服务框架XXL-RPC》
老样子,想看看它自己怎么吹的
1.1 概述>
XXL-RPC 是一个分布式服务框架,提供稳定高性能的RPC远程服务调用功能。拥有"高性能、分布式、注册中心、负载均衡、服务治理"等特性。现已开放源代码,开箱即用。>
1.2 特性>
- 1、快速接入:接入步骤非常简洁,两分钟即可上手;
- 2、服务透明:系统完整的封装了底层通信细节,开发时调用远程服务就像调用本地服务,在提供远程调用能力时不损失本地调用的语义简洁性;
- 3、多调用方案:支持 SYNC、ONEWAY、FUTURE、CALLBACK 等方案;
- 4、多通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY 或 MINA ,HTTP 提供可选方案 NETTY_HTTP 或 Jetty;
- 5、多序列化方案:支持 HESSIAN、HESSIAN1、PROTOSTUFF、KRYO、JACKSON 等方案;
- 6、负载均衡/软负载:提供丰富的负载均衡策略,包括:轮询、随机、LRU、LFU、一致性HASH等;
- 7、注册中心:可选组件,支持服务注册并动态发现;可选择不启用,直接指定服务提供方机器地址通讯;选择启用时,内置可选方案:“XXL-REGISTRY 轻量级注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等;
- 8、服务治理:提供服务治理中心,可在线管理注册的服务信息,如服务锁定、禁用等;
- 9、服务监控:可在线监控服务调用统计信息以及服务健康状况等(计划中);
- 10、容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。
- 11、解决1+1问题:传统分布式通讯一般通过nginx或f5做集群服务的流量负载均衡,每次请求在到达目标服务机器之前都需要经过负载均衡机器,即1+1,这将会把流量放大一倍。而XXL-RPC将会从消费方直达服务提供方,每次请求直达目标机器,从而可以避免上述问题;
- 12、高兼容性:得益于优良的兼容性与模块化设计,不限制外部框架;除 spring/springboot 环境之外,理论上支持运行在任何Java代码中,甚至main方法直接启动运行;
- 13、泛化调用:服务调用方不依赖服务方提供的API;
还是老套路,直接代码下下来,跑个demo看看(ps:注册中心它自己推荐的XXL-REGISTRY)
先看看provider的代码。provider提供了一个简单的sayHi方法,代码如下。
@XxlRpcService
@Service
public class DemoServiceImpl implements DemoService {
private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
@Override
public UserDTO sayHi(String name) {
String word = MessageFormat.format("Hi {0}, from {1} as {2}",
name, DemoServiceImpl.class.getName(), String.valueOf(System.currentTimeMillis()));
if ("error".equalsIgnoreCase(name)) throw new RuntimeException("test exception.");
UserDTO userDTO = new UserDTO(name, word);
logger.info(userDTO.toString());
return userDTO;
}
}
结着按照说明稍微配置一下注册中心,provider走你!
成功启动后可以在注册中心看到刚刚启动的provider
注册中心
点开编辑看看
编辑
很好,看起来非常美好,注册key就是接口,注册信息就是provider的url,看起来非常ok。
那么我们再把consumer跑起来看看,配置同一个注册中心,走你!
控制台表示成功运行起来了。那么我们看看怎么测试一下远程调用。
翻一翻consumer代码,看到一个controller,然后在controller里面会调用远程方法。
@Controller
public class IndexController {
@XxlRpcReference
private DemoService demoService;
@RequestMapping("")
@ResponseBody
public UserDTO http(String name) {
try {
return demoService.sayHi(name);
} catch (Exception e) {
e.printStackTrace();
return new UserDTO(null, e.getMessage());
}
}
}
ok,让我们打开浏览器试一试。和预期的一样,得到了provider的response
浏览器直接调用
这样一个最简单的远程调用就完成了。
那么接下来,就该看看这些功能是怎么实现的了。先帖个框架自己对rpc的描述
rpc原理4.4 RPC工作原理剖析
概念
1、serialization:序列化,通讯数据需要经过序列化,从而支持在网络中传输;
2、deserialization:反序列化,服务接受到序列化的请求数据,需要序列化为底层原始数据;
3、stub:体现在XXL-RPC为服务的api接口;
4、skeleton:体现在XXL-RPC为服务的实现api接口的具体服务;
5、proxy:根据远程服务的stub生成的代理服务,对开发人员透明;
6、provider:远程服务的提供方;
7、consumer:远程服务的消费方;
RPC通讯,可大致划分为四个步骤,可参考上图进行理解:(XXL-RPC提供了多种调用方案,此处以 “SYNC” 方案为例讲解;)>
1、consumer发起请求:consumer会根据远程服务的stub实例化远程服务的代理服务,在发起请求时,代理服务会封装本次请求相关底层数据,如服务iface、methos、params等等,然后将数据经过serialization之后发送给provider;
2、provider接收请求:provider接收到请求数据,首先会deserialization获取原始请求数据,然后根据stub匹配目标服务并调用;
3、provider响应请求:provider在调用目标服务后,封装服务返回数据并进行serialization,然后把数据传输给consumer;
4、consumer接收响应:consumer接受到相应数据后,首先会deserialization获取原始数据,然后根据stub生成调用返回结果,返回给请求调用处。结束。
其实已经讲得比较清楚,在官方提供的demo里,
- consumer调用sayHi方法
- 通过注册中心找到provider
- 代理类封装请求并序列化后发送给provider
- provider反序列化数据,发现调用的是sayHi方法
- 把调用结果序列化返回给consumer
- consumer反序列化返回结果
接下来回到代码本身,看看这一系列过程是怎么实现的。
先从provider的demo入手吧。
先看看配置,只配置了一个XxlRpcSpringProviderFactory
的 bean
从配置代码来看,配置了provider的端口,注册中心的类型(xxl-registry or zookeeper or local,这里是xxl-registry) ,已经注册中心的一些参数(这里是对应注册中心xxl-registry需要的配置:注册中心地址,环境,token)
@Configuration
public class XxlRpcProviderConfig {
private Logger logger = LoggerFactory.getLogger(XxlRpcProviderConfig.class);
@Value("${xxl-rpc.remoting.port}")
private int port;
@Value("${xxl-rpc.registry.xxlregistry.address}")
private String address;
@Value("${xxl-rpc.registry.xxlregistry.env}")
private String env;
@Value("${xxl-rpc.registry.xxlregistry.token}")
private String token;
@Bean
public XxlRpcSpringProviderFactory xxlRpcSpringProviderFactory() {
XxlRpcSpringProviderFactory providerFactory = new XxlRpcSpringProviderFactory();
providerFactory.setPort(port);
providerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
providerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
put(XxlRegistryServiceRegistry.ENV, env);
put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
}});
logger.info(">>>>>>>>>>> xxl-rpc provider config init finish.");
return providerFactory;
}
}
ok,那我们在看看XxlRpcSpringProviderFactory
有什么花头。
实现了3个接口
implements ApplicationContextAware, InitializingBean,DisposableBean
,并继承自XxlRpcProviderFactory
ps:这几个接口都是一些spring bean的一些扩展,详细的可以自行搜索, 下面给出一些简单的描述
InitialingBean
是一个接口,提供了一个唯一的方法afterPropertiesSet()
。
DisposableBean
也是一个接口,提供了一个唯一的方法destory()
。
前者顾名思义在Bean属性都设置完毕后调用afterPropertiesSet()
方法做一些初始化的工作,后者在Bean生命周期结束前调用destory()
方法做一些收尾工作
实现
ApplicationContextAware
接口的Bean,在Bean加载的过程中可以获取到Spring的ApplicationContext
,这个尤其重要,ApplicationContext
是Spring应用上下文,从ApplicationContext
中可以获取包括任意的Bean在内的大量Spring容器内容和信息
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
这里比较好理解,因为XxlRpcSpringProviderFactory
是针对spring的客户端,所需需要额外实现几个接口,主要的逻辑是在它的父类XxlRpcProviderFactory
里面。
先看看第一段
// ---------------------- config ----------------------
里面的内容
netType
定义了网络通信协议(NETTY,NETTY_HTTP,MINA,JETTY)
Serializer
定义了序列化方式
ip,port,accessToken
还不知道干嘛,留着
serviceRegistryClass
和serviceRegistryParam
定义注册中心类和参数
private NetEnum netType;
private Serializer serializer;
private String ip; // for registry
private int port; // default port
private String accessToken;
private Class<? extends ServiceRegistry> serviceRegistryClass;
private Map<String, String> serviceRegistryParam;
再往下看,这些属性的设置全是在initConfig
方法中设值的。
通过这段代码,就可以知道,ip其实是给consumer使用的本地ip(会注册到注册中心的ip),port其实就是用来和consumer通信的端口号(比如刚刚demo里面的7080)
public void initConfig(NetEnum netType,
Serializer serializer,
String ip,
int port,
String accessToken,
Class<? extends ServiceRegistry> serviceRegistryClass,
Map<String, String> serviceRegistryParam) {
// init
this.netType = netType;
this.serializer = serializer;
this.ip = ip;
this.port = port;
this.accessToken = accessToken;
this.serviceRegistryClass = serviceRegistryClass;
this.serviceRegistryParam = serviceRegistryParam;
// valid
if (this.netType==null) {
throw new XxlRpcException("xxl-rpc provider netType missing.");
}
if (this.serializer==null) {
throw new XxlRpcException("xxl-rpc provider serializer missing.");
}
if (this.ip == null) {
this.ip = IpUtil.getIp();
}
if (this.port <= 0) {
this.port = 7080;
}
if (NetUtil.isPortUsed(this.port)) {
throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used.");
}
if (this.serviceRegistryClass != null) {
if (this.serviceRegistryParam == null) {
throw new XxlRpcException("xxl-rpc provider serviceRegistryParam is missing.");
}
}
}
这段代码其实就是一些基础参数的config。比如用什么通信协议,开房什么端口,用什么注册中心等等。
再接着往下看
// ---------------------- start / stop ----------------------
看看start和stop代码有什么
private Server server;
private ServiceRegistry serviceRegistry;
private String serviceAddress;
先往下看,定义了start和stop方法,仔细一看是针对server的start
和stop
,server是netType
的一个instance。
那么就比较好理解了,server就是负责通信的实例(demo里是netty)。
首先拿到server的实例,然后设置了setStartedCallback
和setStopedCallback
,并调用了start方法。
public void start() throws Exception {
// start server
serviceAddress = IpUtil.getIpPort(this.ip, port);
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistry.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
server.start(this);
}
public void stop() throws Exception {
// stop server
server.stop();
}
那我们再深入看看server这些callback
和start
方法都干了什么吧。
server是一个抽象类(nettyServer
会继承这个类),定义了BaseCallback
类型的startedCallback
和stopedCallback
,根据名字猜测是通信server调用start
后,会调用startedCallback.run
方法,server调用stop
之后会调用stopedCallback.run
方法。好像比较抽象,毕竟是抽象类。
public abstract class Server {
protected static final Logger logger = LoggerFactory.getLogger(Server.class);
private BaseCallback startedCallback;
private BaseCallback stopedCallback;
public void setStartedCallback(BaseCallback startedCallback) {
this.startedCallback = startedCallback;
}
public void setStopedCallback(BaseCallback stopedCallback) {
this.stopedCallback = stopedCallback;
}
/**
* start server
*
* @param xxlRpcProviderFactory
* @throws Exception
*/
public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception;
/**
* callback when started
*/
public void onStarted() {
if (startedCallback != null) {
try {
startedCallback.run();
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e);
}
}
}
/**
* stop server
*
* @throws Exception
*/
public abstract void stop() throws Exception;
/**
* callback when stoped
*/
public void onStoped() {
if (stopedCallback != null) {
try {
stopedCallback.run();
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e);
}
}
}
}
那我们直接进入继承他的NettyServer
看看,这样应该就比较清晰了。
start
方法里面直接开了一个守护线程,线程做的事情非常简单:配置并开启netty服务,并调用onStarted
方法
stop方法更简单,直接interrupt
,并调用onStoped
方法。
public class NettyServer extends Server {
private Thread thread;
@Override
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0,0,10, TimeUnit.MINUTES))
.addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializer()))
.addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializer()))
.addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
}
})
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort());
onStarted();
// wait util stop
future.channel().closeFuture().sync();
} catch (Exception e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
}
} finally {
// stop
try {
serverHandlerPool.shutdown(); // shutdownNow
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true);
thread.start();
}
@Override
public void stop() throws Exception {
// destroy server thread
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
// on stop
onStoped();
logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
}
}
让我们再回到刚才的代码,startedCallback
方法就是在netty服务启动完成之后,把provider的信息注册到注册中心
ServiceRegistry是个注册中心的抽象,demo里面用的是XxlRegistryServiceRegistry,其实就是对注册中心操作的一些封装,代码非常简单,不懂可以参考上一篇源码阅读:分布式服务注册中心XXL-REGISTRY(基于1.0.2)
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistry.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
stopedCallback
也比较好理解了,就是在netty服务关闭之后,从注册中心移除自己。
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
最后再看看server invoke
里面有什么吧
看起来像是用来记rpc service的,先不管他,接着往下看
// ---------------------- server invoke ----------------------
/**
* init local rpc service map
*/
private Map<String, Object> serviceData = new HashMap<String, Object>();
public Map<String, Object> getServiceData() {
return serviceData;
}
好像就是字符串的拼接,不知道干嘛的,先往下看
/**
* make service key
*
* @param iface
* @param version
* @return
*/
public static String makeServiceKey(String iface, String version){
String serviceKey = iface;
if (version!=null && version.trim().length()>0) {
serviceKey += "#".concat(version);
}
return serviceKey;
}
addService
用到了makeServiceKey
,看来这个是用来做唯一主键的。
根据名字推测,应该是往前面的serviceData
里面把serviceBean
放进去。
/**
* add service
*
* @param iface
* @param version
* @param serviceBean
*/
public void addService(String iface, String version, Object serviceBean){
String serviceKey = makeServiceKey(iface, version);
serviceData.put(serviceKey, serviceBean);
logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
}
通过IDE看看哪里用了addService
方法
发现在刚才的XxlRpcSpringProviderFactory
就有用到!
看下代码,其实很简单:
- 从spring上下文找到加了
XxlRpcService
注解的bean - 接口名+版本号作为唯一主键把bean放入
serviceData
里面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO,addServices by api + prop
}
最后一个方法,从方法名就能看出来,调用service,接受一个xxlRpcRequest
参数
从serviceData
里面取出request里面要调用的bean
通过反射调用方法并返回response
/**
* invoke service
*
* @param xxlRpcRequest
* @return
*/
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid
if (serviceBean == null) {
xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
return xxlRpcResponse;
}
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
return xxlRpcResponse;
}
try {
// invoke
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
// catch error
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
ok,到这里为止,XxlRpcProviderFactory
的代码看完了,来总结一下它究竟能干什么事情
- 配置通信协议,序列化方式,注册中心
- 开启通信server
- 把
serviceData
里所有的provider服务注册到注册中心 - 通过反射机制,提供调用服务的(
invokeService
)方法
看完了XxlRpcProviderFactory
,我们再回到XxlRpcSpringProviderFactory
与父类不同的是,提供了netType
默认使用netty
,序列化默认使用hessian
// ---------------------- config ----------------------
private String netType = NetEnum.NETTY.name();
private String serialize = Serializer.SerializeEnum.HESSIAN.name();
再看看必须实现的几个接口
这个方法前面已经看到过,这里是把所有带有XxlRpcService
注解的bean放到serverData
这个map里面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO,addServices by api + prop
}
最后两个方法,很简单,就是配置一下基本参数,以及调用父类的方法。
@Override
public void afterPropertiesSet() throws Exception {
this.prepareConfig();
super.start();
}
@Override
public void destroy() throws Exception {
super.stop();
}
这样一来,XxlRpcSpringProviderFactory
就全部阅读完了。我们重新梳理一遍流程XxlRpcInvokerConfig
干的事情
- 配置通信框架(
netty
),序列化框架(hessian
),注册中心(xxl-registry) - 把使用了
XxlRpcService
注解的bean全部put到Map<String,Object> serviceData
里面,key为bean继承的接口名+版本号,Object为service的bean本身 - 启动通信框架(
netty
),启动成功后把serviceData
里面的bean注册到注册中心
这样,provider就已经完全启动完成了,一切准备就绪,就等客户端调用了!
那么,我们再看看客户端的代码!
老套路,从配置开始看起,和provider的配置差不多,就不在赘述
@Configuration
public class XxlRpcInvokerConfig {
private Logger logger = LoggerFactory.getLogger(XxlRpcInvokerConfig.class);
@Value("${xxl-rpc.registry.xxlregistry.address}")
private String address;
@Value("${xxl-rpc.registry.xxlregistry.env}")
private String env;
@Value("${xxl-rpc.registry.xxlregistry.token}")
private String token;
@Bean
public XxlRpcSpringInvokerFactory xxlJobExecutor() {
XxlRpcSpringInvokerFactory invokerFactory = new XxlRpcSpringInvokerFactory();
invokerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
invokerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
put(XxlRegistryServiceRegistry.ENV, env);
put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
}});
logger.info(">>>>>>>>>>> xxl-rpc invoker config init finish.");
return invokerFactory;
}
}
直接去XxlRpcSpringInvokerFactory
里面看看吧,InitializingBean
,DisposableBean
不再赘述
实现
BeanFactoryAware
接口的Bean,在Bean加载的过程中可以获取到加载该Bean的BeanFactory
InstantiationAwareBeanPostProcessor
作用的是Bean实例化前后,即:
1、Bean构造出来之前调用postProcessBeforeInstantiation()
方法
2、Bean构造出来之后调用postProcessAfterInstantiation()
方法
public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {
ok,看看具体类里面的代码,先看第一段config相关
这段代码似曾相识,在ProviderFactory里面也有:注册中心配置
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
接着往下看,定义了一个 XxlRpcInvokerFactory
// ---------------------- util ----------------------
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
进到XxlRpcInvokerFactory
里面看看吧
首先很明显,这是一个单例模式
然后也配置了注册中心
public class XxlRpcInvokerFactory {
private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);
// ---------------------- default instance ----------------------
private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);
public static XxlRpcInvokerFactory getInstance() {
return instance;
}
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public XxlRpcInvokerFactory() {
}
public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam) {
this.serviceRegistryClass = serviceRegistryClass;
this.serviceRegistryParam = serviceRegistryParam;
}
// 略
}
再往下看,似曾相识的代码
start
方法是开始想注册中心注册
stop
方法先从注册中心移除,然后在吧stopCallbackList
里面还没执行的方法执行了
那什么时候调用addStopCallBack
把stopCallBack
加进list呢?用IDE搜一搜,
发现在JettyClient
和ConnectClient
的时候用到了,好像暂时和我们demo的代码没什么关系,先放一放
最后把responseCallbackThreadPool
线程池shutDown
了。
// ---------------------- start / stop ----------------------
public void start() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
}
}
public void stop() throws Exception {
// stop registry
if (serviceRegistry != null) {
serviceRegistry.stop();
}
// stop callback
if (stopCallbackList.size() > 0) {
for (BaseCallback callback: stopCallbackList) {
try {
callback.run();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
// stop CallbackThreadPool
stopCallbackThreadPool();
}
// ---------------------- service registry ----------------------
private ServiceRegistry serviceRegistry;
public ServiceRegistry getServiceRegistry() {
return serviceRegistry;
}
// ---------------------- service registry ----------------------
private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();
public void addStopCallBack(BaseCallback callback){
stopCallbackList.add(callback);
}
那
responseCallbackThreadPool
线程池是用来干什么的?用IDE搜一搜,就在stopCallbackThreadPool
上面executeResponseCallback
接受一个Runnable
对象,并初始化线程池,并放入线程池那么
executeResponseCallback
什么时候会被用到?
// ---------------------- response callback ThreadPool ----------------------
private ThreadPoolExecutor responseCallbackThreadPool = null;
public void executeResponseCallback(Runnable runnable){
if (responseCallbackThreadPool == null) {
synchronized (this) {
if (responseCallbackThreadPool == null) {
responseCallbackThreadPool = new ThreadPoolExecutor(
10,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
}
}); // default maxThreads 300, minThreads 60
}
}
}
responseCallbackThreadPool.execute(runnable);
}
public void stopCallbackThreadPool() {
if (responseCallbackThreadPool != null) {
responseCallbackThreadPool.shutdown();
}
}
其实就在上面
定义了一个concurrentMap
,参数是一个string和一个XxlRpcFutureResponse
根据future这个名字,可以猜一下应该是个future(多线程)操作相关的
set和remove方法看起来就是对map进行一些关于request的操作
notifyInvokerFuture
方法,从futureResponsePool
根据requestId
取出一个XxlRpcFutureResponse
对象
然后判断Response的状态,并做一些设置,然后从futureResponsePool
移出这个requestId
到这里,还比较蒙逼,大概只能看出,利用了future,对response做一些处理。
那么问题来了,response是什么时候生产的?为什么会有一堆callback
方法?这样做的目的是什么?
因为没有看到调用远程服务的代码,看不懂很正常!
// ---------------------- future-response pool ----------------------
// XxlRpcFutureResponseFactory
private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
futureResponsePool.put(requestId, futureResponse);
}
public void removeInvokerFuture(String requestId){
futureResponsePool.remove(requestId);
}
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// get
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
// notify
if (futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// other nomal type
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
让我们回到XxlRpcSpringInvokerFactory
,还剩最后一个方法postProcessAfterInstantiation
看看都干了些什么吧
取出加了XxlRpcReference
注解的字段(field)
组装成XxlRpcReferenceBean
,并根据名字猜测,通过这个bean得到一个service的代理对象!
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
// collection
final Set<String> serviceKeyList = new HashSet<>();
// parse XxlRpcReferenceBean
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
if (field.isAnnotationPresent(XxlRpcReference.class)) {
// valid
Class iface = field.getType();
if (!iface.isInterface()) {
throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
}
XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
// init reference bean
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
rpcReference.netType(),
rpcReference.serializer().getSerializer(),
rpcReference.callType(),
rpcReference.loadBalance(),
iface,
rpcReference.version(),
rpcReference.timeout(),
rpcReference.address(),
rpcReference.accessToken(),
null,
xxlRpcInvokerFactory
);
Object serviceProxy = referenceBean.getObject();
// set bean
field.setAccessible(true);
field.set(bean, serviceProxy);
logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
// collection
String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version());
serviceKeyList.add(serviceKey);
}
}
});
// mult discovery
if (xxlRpcInvokerFactory.getServiceRegistry() != null) {
try {
xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return super.postProcessAfterInstantiation(bean, beanName);
}
看看getObject
做了什么吧
- 配置一个动态代理(猜测就是调用远程服务用的)
- 根据
XxlRpcInvokerFactory
配置的注册中心,查找provider地址 - 通过通信框架,把数据发送到provider机器
-
NettyClientHandler
获得响应的时候, 会调用futureResponse.setResponse(xxlRpcResponse);
把拿到的response放进futureResponse里面 - 再通过
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
拿到response(同步调用)
// ---------------------- util ----------------------
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// method param
String className = method.getDeclaringClass().getName(); // iface.getName()
String varsion_ = version;
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = args;
// filter for generic
if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
Class<?>[] paramTypes = null;
if (args[3]!=null) {
String[] paramTypes_str = (String[]) args[3];
if (paramTypes_str.length > 0) {
paramTypes = new Class[paramTypes_str.length];
for (int i = 0; i < paramTypes_str.length; i++) {
paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
}
}
}
className = (String) args[0];
varsion_ = (String) args[1];
methodName = (String) args[2];
parameterTypes = paramTypes;
parameters = (Object[]) args[4];
}
// filter method like "Object.toString()"
if (className.equals(Object.class.getName())) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String finalAddress = address;
if (finalAddress==null || finalAddress.trim().length()==0) {
if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {
// discovery
String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
// load balance
if (addressSet==null || addressSet.size()==0) {
// pass
} else if (addressSet.size()==1) {
finalAddress = addressSet.first();
} else {
finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
}
}
}
if (finalAddress==null || finalAddress.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(methodName);
xxlRpcRequest.setParameterTypes(parameterTypes);
xxlRpcRequest.setParameters(parameters);
// send
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
} else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
return null;
} else if (CallType.ONEWAY == callType) {
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
}
}
});
}
回到前面的代码
getObject
的代理之后,为field设置了代理,并把serviceKey
(接口名+版本号)放在里一个set里面。
现先总结一下XxlRpcInvokerConfig
干的事情吧
- 初始化一个
XxlRpcSpringInvokerFactory
的bean - 配置注册中心,通信框架,序列化框架
- 向注册中心注册
- 为
@XxlRpcReference
注解的field配置代理
是不是似曾相识,没错,和provider的config其实几乎一样。
那我们从consumer角度看看,一次调用是怎么完成的吧!看完了,前面的疑问应该都能解决了,吧
看看调用代码,用@XxlRpcReference
注解了provider提供的接口
然后直接通过接口调用方法。
ok,那么所有猫腻都在@XxlRpcReference
这个注解里面了
@Controller
public class IndexController {
@XxlRpcReference
private DemoService demoService;
@RequestMapping("")
@ResponseBody
public UserDTO http(String name) {
try {
return demoService.sayHi(name);
} catch (Exception e) {
e.printStackTrace();
return new UserDTO(null, e.getMessage());
}
}
}
先看看这个注解本身吧
给了几个默认值:默认使用netty,使用HESSIAN,使用同步调用,负责均衡方式是轮询
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlRpcReference {
NetEnum netType() default NetEnum.NETTY;
Serializer.SerializeEnum serializer() default Serializer.SerializeEnum.HESSIAN;
CallType callType() default CallType.SYNC;
LoadBalance loadBalance() default LoadBalance.ROUND;
//Class<?> iface;
String version() default "";
long timeout() default 1000;
String address() default "";
String accessToken() default "";
//XxlRpcInvokeCallback invokeCallback() ;
}
搜一搜那里对这个注解做了处理吧
似曾相识,就是前面提到过的XxlRpcSpringInvokerFactory
的postProcessAfterInstantiation
方法!
所以整个调用过程应该就是调用代理方法的过程。
这样整个客户端调用过程就比较清晰了
- 初始化的时候,配置客户端的通信框架,序列化框架,注册中心
- 通过扫描
@XxlRpcReference
注解,初始化provider提供的接口的代理 - 进行远程调用的时候,实际是代理调用
- 通过代理通信协议客户端和注册中心,向provider请求数据
发一次请求,下个断点看看
果然进到这个代理调用里面了
看看服务端是怎么接收请求的吧,有了前面的服务端代码阅读和客户端调用代码阅读,其实服务端的就比较简单了
长话短说:
在装载XxlRpcSpringProviderFactory
的时候会扫描@XxlRpcService
注解的类,并把它作为service放进(put
)服务map里面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO,addServices by api + prop
}
当发生远程调用的时候,会调用invokeService
方法,主要的代码就是这段通过反射来获取真正需要调用的方法
try {
// invoke
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
// catch error
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
ok,到这里整个同步调用的流程就比较清楚了。我们在回顾一下它吹嘘的功能。
- 1、快速接入:接入步骤非常简洁,两分钟即可上手;
确实还行
- 2、服务透明:系统完整的封装了底层通信细节,开发时调用远程服务就像调用本地服务,在提供远程调用能力时不损失本地调用的语义简洁性;
通过扫描注解和反射的方式,做到了本地调用的效果
- 3、多调用方案:支持 SYNC、ONEWAY、FUTURE、CALLBACK 等方案;
现在我们只试过同步调用(SYNC),接下来看看其他调用方式吧
ONEWAY
用ONEWAY模式拿到的返回值是null。
provider的代码如下,其实就是发起了一个不需要结果的调用
else if (CallType.ONEWAY == callType) {
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
}
FUTURE
直接返回null,拿到结果要从future里面get
else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
}
}
@RequestMapping("")
@ResponseBody
public UserDTO http(String name) {
try {
// dto is null
UserDTO dto = demoService.sayHi(name);
Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);
return future.get();
} catch (Exception e) {
e.printStackTrace();
return new UserDTO(null, e.getMessage());
}
}
callback
调用完成后会调用onSuccess
或onFailure
方法
else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
}
return null;
}
XxlRpcInvokeCallback.setCallback(new XxlRpcInvokeCallback<UserDTO>() {
@Override
public void onSuccess(UserDTO result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable exception) {
exception.printStackTrace();
}
});
demoService.sayHi("[CALLBACK]jack");
ps:这部分有很多值得深度阅读的地方,暂时
todo
- 4、多通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY 或
MINA ,HTTP 提供可选方案 NETTY_HTTP 或 Jetty;
通过继承同一接口来完成调用方的细节隐藏
- 5、多序列化方案:支持 HESSIAN、HESSIAN1、PROTOSTUFF、KRYO、JACKSON 等方案;
和通信的方式差不多
- 6、负载均衡/软负载:提供丰富的负载均衡策略,包括:轮询、随机、LRU、LFU、一致性HASH等;
这部分比较简单,不在赘述
- 7、注册中心:可选组件,支持服务注册并动态发现;可选择不启用,直接指定服务提供方机器地址通讯;选择启用时,内置可选方案:“XXL-REGISTRY 轻量级注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等;
前面已经提到
- 8、服务治理:提供服务治理中心,可在线管理注册的服务信息,如服务锁定、禁用等;
通过注册中心实现
- 9、服务监控:可在线监控服务调用统计信息以及服务健康状况等(计划中);
pass
- 10、容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。
通过注册中心实现
- 11、解决1+1问题:传统分布式通讯一般通过nginx或f5做集群服务的流量负载均衡,每次请求在到达目标服务机器之前都需要经过负载均衡机器,即1+1,这将会把流量放大一倍。而XXL-RPC将会从消费方直达
服务提供方,每次请求直达目标机器,从而可以避免上述问题;
通过注册中心实现
- 12、高兼容性:得益于优良的兼容性与模块化设计,不限制外部框架;除 spring/springboot 环境之外,理论上支持运行在任何Java代码中,甚至main方法直接启动运行;
demo里面有,比较简单,就是不通过反射,手动创建对象。
- 13、泛化调用:服务调用方不依赖服务方提供的API;
同上
ok,就剩下一个todo了,那就是各种调用方案的具体实现
先todo了...
网友评论