RPC是远程过程调用(Remote Procedure Call)的缩写形式。在本地调用一个服务时封装了底层的代理、数据传输、序列化等细节,如同调用本地服务一样。本文将介绍如何使用netty和zookeeper从零实现一个具有服务注册、远程调用的rpc框架。
技术选型
首先实现一个简单的rpc需要什么?
1. 网络通讯
要实现调用远程服务和调用本地服务一样,肯定涉及到网络的通讯。可以使用最简单的Socket连接实现,但是Socket是基于同步阻塞的IO的抽象,网络传输的效率不高。这里采用高性能非阻塞的网络框架netty。
2.序列化
要知道网络中传输的是字节,如何将我们的对象在网络中传输这里就涉及到需要将对象序列化。序列化的实现方式也有很多比如java自带的序列化ObjectOutputStream/ObjectInputStream,优点就是java自带的序列化,实现简单,缺点在于占用空间比较高、序列化和反序列化时间比较长,而且不能跨语言只能在java构造的服务端之间使用。将对象序列化JSON也是一个选择,可以跨系统,但是对于rpc网络传输来说占用空间比较大,不是最优选择。这里选择google的protobuf,跨系统高性能而且占用空间很少。
3.动态代理
实现一个rpc的接口要涉及到网络通讯和序列化这些底层细节,在我们业务方其实是不关心这些细节的,所以需要一个代理对象去帮我们做这些底层调用的细节,所以就涉及到了动态代理。java的动态代理实现的方式有很多,比如jdk动态代理、cglib、javaagent,虽然cglib与javaagent在性能上比jdk动态代理高很多,但是这里为了实现简单选择了jdk动态代理。
4.注册中心
现在网络通讯有了、序列化与动态代理,基本可以实现服务之间的调用。但是还是存在一个问题,就是服务调用方怎么知道服务提供方的IP和端口,虽然可以通过在代码中写死ip和端口或者在配置文件中配置服务的ip和端口,这样也能达到服务之间的调用的目的,但是不能动态的检测服务提供方的存活情况,所以需要一个注册中心来帮我们实现一个服务的发现与通知。本文采用的是zookeeper做为注册中心。
RPC服务架构
image.png以下描述来自dubbo官网
上面是dubbo的架构图,有几个关键角色
角色 | 功能 |
---|---|
Proivder | 暴露服务的服务提供方 |
Consumer | 调用远程服务的服务消费方 |
Registry | 服务注册发现 |
Monitor | 服务监控 |
Container | 服务运行的容器 |
dubbo服务启动流程:
- 服务容器负责启动,加载,运行服务提供者。
- 服务提供者在启动时,向注册中心注册自己提供的服务。
- 服务消费者在启动时,向注册中心订阅自己所需的服务。
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
- 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
本文的rpc实现也基本是这个流程,在服务启动的时候开始本地服务端口,并将服务提供者注册到注册中心,服务消费者从注册中心拉去服务到本地,并订阅需要注册的服务,当服务提供方发生变化的时候,通过注册中心推送到消费者。
IOC容器
第一步我们将实现一个简易的ioc容器,将自己需要暴露出去的服务和需要依赖的远程服务注册到自己的ioc容器中。
接口定义:
public interface BeanFactory {
/**
* 从容器中获取bean
* @param requiredType
* @param <T>
* @return
*/
<T> T getBean(Class<T> requiredType);
/**
* 从容器中获取bean
* @param className
* @param <T>
* @return
*/
<T> T getBeanByName(String className);
/**
* 将bean注入到容器中
* @param type
* @param bean
* @return
*/
void registerBean(Class type,Object bean);
}
1. 自定义注解
//服务注册
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RpcService {
}
//服务引用
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface RpcReference {
}
序列化
1.protobuf
这里不对protobuf做过多的介绍,它是一种数据存储的结构类似于xml、json只不过在效率上要高上很多。下面是一段他的官方介绍
protocol buffers 是一种语言无关、平台无关、可扩展的序列化结构数据的方法,它可用于(数据)通信协议、数据存储等。
Protocol Buffers 是一种灵活,高效,自动化机制的结构数据序列化方法-可类比 XML,但是比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更为简单。
你可以定义数据的结构,然后使用特殊生成的源代码轻松的在各种数据流中使用各种语言进行编写和读取结构数据。你甚至可以更新数据结构,而不破坏由旧数据结构编译的已部署程序。
2.传输结构定义
怎么定义传输的数据,首先我们要知道反射调用一个方法需要哪些参数。有下面的代码可知我们需要类的全限名、方法名、方法的参数列表和方法参数。
package com.simple.rpc.test;
import java.io.Serializable;
import java.lang.reflect.Method;
public class Person implements Serializable {
private Integer age;
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public static void main(String[] args) throws Exception{
String className = "com.simple.rpc.test.Person";
Class clazz = Class.forName(className);
Person person = (Person)clazz.newInstance();
Method setAge = clazz.getMethod("setAge", Integer.class);
Method getAge = clazz.getMethod("getAge", null);
setAge.invoke(person,5);
System.out.println(getAge.invoke(person));
}
}
protobuf文件定义
syntax = "proto2";
package com.simple.rpc.protocol;
option java_outer_classname = "Message";
message RpcMessage {
enum Datatype {
Request = 1;
Response = 2;
}
required Datatype date_type = 1;
oneof dataBody{
Request request = 2;
Response response = 3;
}
}
message Request {
//请求id
required string id = 1;
//请求的class全限名
required string class_name = 2;
//方法名
required string method_name = 3;
//方法参数类型
required bytes parameter_types = 4;
//方法参数
required bytes parameters = 5;
}
网络通讯
网络通讯是基于netty实现客户端和服务端这里都不贴具体的代码,需要注意的有两个地方。
- TCP_NODELAY
TCP是可靠传输,每个数据包都会携带TCP协议头20字节和IP头20字节,当数据包很小和时候比如只有1个字节,这对网络的效率和带宽都是很大的浪费。所以提出了Nagle`s算法,核心是如果是连续的小数据包如果没有达到一个 MSS(Maximum SegmentSize,最大分段大小),并且还没收到之前的ACK消息,就会将这些数据包暂存起来直到到达一个MSS,或者收到一个ACK才会发送出去。
这样会减少不必要的网络传输,提高网络的效率。但是rpc需要强网络交互,所以需要将Nagle关闭。
服务端:.childOption(ChannelOption.TCP_NODELAY, true)
客户端:.option(ChannelOption.TCP_NODELAY, true)
- 长连接还是短连接
- 短连接:每次客户端请求服务器的时候都建立一个TCP连接,好处是保证每次调用的TCP连接都是有效的,不用去管理TCP连接。缺点就是每次调用都会经过TCP三次握手效率比较低,而且在调用非常频繁的情况下会出现端口不够的情况。
- 长连接: 客户端与服务器通讯都通过一个TCP连接,好处就是不用每次客户端调用都建立连接提高通讯效率,但是需要维护这个连接的有效性。比如通过发送心跳的方式来检查服务端与客户端彼此的存活情况,netty有自带心跳检测的IdleStateHandler,通过这个实现心跳检测会很方便。
- 选择:这里我选择的是通过短连接实现客户端与服务器的通讯,纯粹是考虑实现简单。rpc框架都是追求高性能的,生产级别的rpc框架应该选择长连接实现。
动态代理
这里基于jdk的动态代理实现,比较简单实现InvocationHandler的invoke将调用的类名、方法、参数封装之后交给网络传输。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invoker invoker = new Invoker(method,args);
return invoker.doInvoke().get();
}
注册中心--zookeeper
这一步需要实现功能是服务的注册、监听。
1. 服务在zookeeper的目录结构
每一个class就相当于一个文件目录,下面有providers存放注册该服务的主机ip和端口,consumers下面存放的是该服务有多少订阅者。其中providers和consumers下面的节点都是临时节点,就选服务突然断开链接也能同步到其他服务器上面去,不会调用已经失效的服务。而且基于这个目录文件也可以实现对服务监控,dubbo也是这种目录结构。
2. 接口定义
public interface RemoteRegistry {
/**
* 将服务注册到远程中心
* @param service
*/
void registry(Class<?> service);
/**
* 订阅某个服务
* @param service
* @param consumer
*/
void subscriberService(Class<?> service,Class<?> consumer);
/**
* 从远处拉取服务到本地缓存
* @param service
*/
void pullProvider(Class<?> service);
}
具体的实现就不贴了,可以根据zk工具实现代码,本项目是用的curator。
3.监听器
我们需要监听的就是某个服务下面的providers的叶子节点的变动,并同步到本地缓存中。
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
String path = data.getPath();
String[] split = path.split("/");
String url = split[split.length - 1];
switch (event.getType()) {
case CHILD_ADDED:
ProviderCache.add(providerName,path);
logger.info("service : {} registry remote from : {}",providerName,url);
break;
case CHILD_REMOVED:
ProviderCache.remove(providerName,path);
logger.info("service : {} closed from : {}",providerName,url);
break;
default:
break;
}
}
测试
1. 克隆代码、打包到本地
git clone https://github.com/haoyann/simple-rpc.git
mvn clean package -DskipTests=true
mvn install -DskipTests=true
上传
2. 构建项目
添加maven依赖
<dependency>
<groupId>com.simple-rpc</groupId>
<artifactId>simple-rpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
项目结构
image.png
Service
@RpcService
public class DemoServiceImpl implements DemoService {
@Override
public String hello(String name) {
return "hello:" +name;
}
}
public class ProviderApp {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig.Builder()
.host("127.0.0.1")
.port(2181)
.applicationPort(8899)
.build();
ApplicationStart applicationStart = new ApplicationStart(serverConfig);
applicationStart.run(new String[]{"com.rpc.test.service"});
}
}
Comsumer
public class DemoServiceRemote {
@RpcReference
private DemoService demoService;
public String hello(String name){
return demoService.hello(name);
}
}
public class ConsumerApp {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig.Builder()
.host("127.0.0.1")
.port(2181)
.applicationPort(9001)
.build();
ApplicationStart applicationStart = new ApplicationStart(serverConfig);
applicationStart.run(new String[]{"com.rpc.test.remote"});
for (int i = 0; i < 100; i++){
DemoServiceRemote bean = DefaultBeanFactory.getInstance()
.getBean(DemoServiceRemote.class);
long start = System.currentTimeMillis();
System.out.println(bean.hello("我是:"+i));
System.out.println("耗时:"+String.valueOf(System.currentTimeMillis()-start));
}
}
}
结果
运行结果
总结
至此整个rpc的创建和调用流程都已经完成了,最后再理一次服务启动和一次服务调用的流程。
服务启动
- 扫描指定包、放入ioc容器
- 连接zk注册和订阅服务,设置属性为代理对象
- 启动netty服务端
服务调用
- 动态代理隐藏调用细节
- 封装请求参数
- 创建netty客户端发送请求、返回feature抽象执行结果
- 接受到服务端返回结果,唤醒阻塞线程设置返回接口
- 反序列化返回结果
篇幅有限完整代码请参考:https://github.com/haoyann/simple-rpc
网友评论