本人微信公众号(jwfy)欢迎关注
上一期完成了手写一个RPC框架,看看100个线程同时调用效果如何,但还是遗留了很多问题以及可以优化的点,这次就完全重写之前的代码,演进到v2版本,使得代码逻辑更加规范的同时,引入ZooKeeper辅助完成服务治理。
在代码展示之前还是先介绍一些基本的概念以及设计思路,ZooKeeper是什么,服务治理又是什么等,最后贴了部分关键代码以说明和v1版本的区别,有哪些点的改进措施。
最后还提了个问题:线程池打满了怎么办?,你有什么好的解决方案呢?
ZooKeeper
ZooKeeper(直译为动物管理员,简称zk)是一个分布式、开源的应用协调服务,利用和Paxos类似的ZAB选举算法实现分布式一致性服务。有类似于Unix文件目录的节点信息,同时可以针对节点的变更添加watcher监听以能够即使感知到节点信息变更。可提供的功能例如域名服务、配置维护、同步以及组服务等(此功能介绍来自官网描述:It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface)。如下图就是DUBBO存储在ZooKeeper的节点数据情况。
image在本地启动服务后通过zk客户端连接后也可通过命令查看节点信息,如下图所示。
imageZooKeeper包含了4种不同含义的功能节点,在每次创建节点之前都需要明确声明节点类型。
类型 | 定义 | 描述 |
---|---|---|
PERSISTENT | 持久化目录节点 | 客户端与zookeeper断开连接后,该节点依旧存在 |
PERSISTENT_SEQUENTIAL | 持久化顺序编号目录节点 | 客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号 |
EPHEMERAL | 临时目录节点 | 客户端与zookeeper断开连接后,该节点被删除 |
EPHEMERAL_SEQUENTIAL | 临时顺序编号目录节点 | 客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号 |
ZooKeeper使用之前需要先进行安装,后开启服务端的服务,我们的服务作为客户端
连接ZooKeeper以便于后续的操作。具体可参考官网文档Zookeeper3.5.5 官方文档,在实际的java项目开发中也是可以通过maven引入ZkClient或者Curator开源的客户端,在本文学习笔记中是使用的Curator,因为其已经封装了原始的节点注册、数据获取、添加watcher等功能。具体maven引入的版本如下,
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
服务治理
服务治理也就是针对服务进行管理的措施,例如服务发现
、服务暴露
、负载均衡
、快速上下线
等都是服务治理的具体体现。
- 服务发现:从服务管理中心获取到需要的服务相关信息,例如我们可以从zk中获取相关服务的机器信息,然后我们就可以和具体机器直连完成相关功能
- 服务暴露:服务提供方可以提供什么样子的功能,经过服务暴露暴露出去,其他使用方就可以通过服务发现发现具体的服务提供方信息
- 负载均衡:一般针对的是服务提供方,避免大量请求同时打到一台机器上,采用随机、轮询等措施让请求均分到各个机器上,提供服务效率,
限流
,灰度
等也都是类似的操作,通过动态路由、软负载的形式处理分发请求。 - 快速上线下:以往需要上下线可能需要杀掉机器上的进程,现在只需要让该服务停止暴露即可,实现服务的灵活上下线。
数据处理流程
服务端:服务的提供方,接受网络传输的请求数据、通过网络把应答数据发送给客户端
客户端:服务的调用方,使用本地代理,通过网络把请求数据发送出去,接受服务端返回的应答数据
所有的数据传输都是按照上面图片说的流程来的,如果需要添加自定义的序列化工具,则需要在把数据提交到socket的输出流缓冲区之前按照序列化工具完成序列化操作,反序列化则进行反向操作即可。
RPC 实践 V2版本
文件夹目录如下图所示,其中:
- balance文件夹:负载均衡有关
- config文件夹:网络套接字传输的数据模型以及服务暴露、服务发现的数据模型
- core文件夹:核心文件夹,包含了服务端和客户端的请求处理、代理生成等
- demo文件夹:测试试用
- io.protocol文件夹:目前是只有具体的请求对象和网络io的封装
- register:服务注册使用,实现了使用zk进行服务注册和服务发现的操作
由于代码太长,只贴部分重要的代码操作。
服务暴露 & 服务发现
public interface ServiceRegister {
/**
* 服务注册
* @param config
*/
void register(BasicConfig config);
/**
* 服务发现,从注册中心获取可用的服务提供方信息
* @param request
* @return
*/
InetSocketAddress discovery(RpcRequest request, ServiceType nodeType);
}
默认使用了CuratorFramework客户端完成zk数据的操作
public class ZkServiceRegister implements ServiceRegister {
private CuratorFramework client;
private static final String ROOT_PATH = "jwfy/simple-rpc";
private LoadBalance loadBalance = new DefaultLoadBalance();
public ZkServiceRegister() {
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory
.builder()
.connectString("127.0.0.1:2182")
.sessionTimeoutMs(50000)
.retryPolicy(policy)
.namespace(ROOT_PATH)
.build();
// 业务的根路径是 /jwfy/simple-rpc ,其他的都会默认挂载在这里
this.client.start();
System.out.println("zk启动正常");
}
@Override
public void register(BasicConfig config) {
String interfacePath = "/" + config.getInterfaceName();
try {
if (this.client.checkExists().forPath(interfacePath) == null) {
// 创建 服务的永久节点
this.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(interfacePath);
}
config.getMethods().forEach(method -> {
try {
String methodPath = null;
ServiceType serviceType = config.getType();
if (serviceType == ServiceType.PROVIDER) {
// 服务提供方,需要暴露自身的ip、port信息,而消费端则不需要
String address = getServiceAddress(config);
methodPath = String.format("%s/%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName(), address);
} else {
methodPath = String.format("%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName());
}
System.out.println("zk path: [" + methodPath + "]");
this.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(methodPath, "0".getBytes());
// 创建临时节点,节点包含了服务提供段的信息
} catch (Exception e) {
e.getMessage();
}
});
} catch (Exception e) {
e.getMessage();
}
}
@Override
public InetSocketAddress discovery(RpcRequest request, ServiceType nodeType) {
String path = String.format("/%s/%s/%s", request.getClassName(), nodeType.getType(), request.getMethodName());
try {
List<String> addressList = this.client.getChildren().forPath(path);
// 采用负载均衡的方式获取服务提供方信息,不过并没有添加watcher监听模式
String address = loadBalance.balance(addressList);
if (address == null) {
return null;
}
return parseAddress(address);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private String getServiceAddress(BasicConfig config) {
String hostInfo = new StringBuilder()
.append(config.getHost())
.append(":")
.append(config.getPort())
.toString();
return hostInfo;
}
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.valueOf(result[1]));
}
public void setLoadBalance(LoadBalance loadBalance) {
// 可以重新设置负载均衡的策略
this.loadBalance = loadBalance;
}
}
image
服务启动后利用zkclient查询到在zk中包含的节点信息,其中默认的命名空间是jwfy/simple-rpc
负载均衡
public interface LoadBalance {
String balance(List<String> addressList);
}
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
return null;
}
if (addressList.size() == 1) {
return addressList.get(0);
}
return doLoad(addressList);
}
abstract String doLoad(List<String> addressList);
}
public class DefaultLoadBalance extends AbstractLoadBalance {
@Override
String doLoad(List<String> addressList) {
Random random = new Random();
// 利用随机函数选择一个,其中random.nextIn生成的数据是在[0, size) 之间
return addressList.get(random.nextInt(addressList.size()));
}
}
上面的负载均衡代码其实很简单,就是从一个机器列表addressList中选择一个,如果列表为空或者不存在则直接返回null,如果机器只有1台则直接获取返回即可,当列表记录超过1条后利用随机函数生成一个列表偏移量以获取对应数据。也可以按照类似完成更多负载均衡的策略,然后调用setLoadBalance方法就可以了。
IO 处理
public interface MessageProtocol {
/**
* 服务端解析从网络传输的数据,转变成request
* @param inputStream
* @return
*/
void serviceToRequest(RpcRequest request, InputStream inputStream);
/**
* 服务端把计算机的结果包装好,通过outputStream 返回给客户端
* @param response
* @param outputStream
* @param <T>
*/
<T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);
/**
* 客户端把请求拼接好,通过outputStream发送到服务端
* @param request
* @param outputStream
*/
void clientToRequest(RpcRequest request, OutputStream outputStream);
/**
* 客户端接收到服务端响应的结果,转变成response
* @param response
* @param inputStream
*/
void clientGetResponse(RpcResponse response, InputStream inputStream);
}
实现类DefaultMessageProtocol
public class DefaultMessageProtocol implements MessageProtocol {
@Override
public void serviceToRequest(RpcRequest request, InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
request.setClassName(input.readUTF());
request.setMethodName(input.readUTF());
request.setParameterTypes((Class<?>[])input.readObject());
request.setArguments((Object[])input.readObject());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
try {
ObjectOutputStream output = new ObjectOutputStream(outputStream);
output.writeBoolean(response.getError());
output.writeObject(response.getResult());
output.writeObject(response.getErrorMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void clientToRequest(RpcRequest request, OutputStream outputStream) {
try {
ObjectOutputStream ouput = new ObjectOutputStream(outputStream);
ouput.writeUTF(request.getClassName());
ouput.writeUTF(request.getMethodName());
ouput.writeObject(request.getParameterTypes());
ouput.writeObject(request.getArguments());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void clientGetResponse(RpcResponse response, InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
response.setError(input.readBoolean());
response.setResult(input.readObject());
response.setErrorMessage((String) input.readObject());
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务端请求处理
public class ServiceHandler {
private ThreadPoolExecutor executor = null;
private RpcService rpcService;
private MessageProtocol messageProtocol;
public ServiceHandler(RpcService rpcService) {
this.rpcService = rpcService;
ThreadFactory commonThreadName = new ThreadFactoryBuilder()
.setNameFormat("Parse-Task-%d")
.build();
this.executor = new ThreadPoolExecutor(
10,
10,
2,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
commonThreadName, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
SocketTask socketTask = (SocketTask) r;
Socket socket = socketTask.getSocket();
if (socket != null) {
try {
socket.close();
System.out.println("reject socket:" + socketTask + ", and closed");
// 无法及时处理和响应的就快速拒绝掉
} catch (IOException e) {
}
}
}
});
}
public RpcService getRpcService() {
return rpcService;
}
public void setRpcService(RpcService rpcService) {
this.rpcService = rpcService;
}
public MessageProtocol getMessageProtocol() {
return messageProtocol;
}
public void setMessageProtocol(MessageProtocol messageProtocol) {
this.messageProtocol = messageProtocol;
}
public void handler(Socket socket) {
// 接收到新的套接字,包装成为一个runnable提交给线程去执行
this.executor.execute(new SocketTask(socket));
}
class SocketTask implements Runnable {
private Socket socket;
public SocketTask(Socket socket) {
this.socket = socket;
}
public Socket getSocket() {
return socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
RpcRequest request = new RpcRequest();
messageProtocol.serviceToRequest(request, inputStream);
// 获取客户端请求数据,统一包装成RpcRequest
RpcResponse response = rpcService.invoke(request);
// 反射调用,得到具体的返回值
System.out.println("request:[" + request + "], response:[" + response + "]");
messageProtocol.serviceGetResponse(response, outputStream);
// 再返回给客户端
} catch (Exception e) {
// error
} finally {
if (socket != null) {
// SOCKET 关闭一定要加上,要不然会出各种事情
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
客户端 代理对象
public class ProxyInstance implements InvocationHandler {
private RpcClient rpcClient;
private Class clazz;
public ProxyInstance(RpcClient client, Class clazz) {
this.rpcClient = client;
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setClassName(clazz.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setArguments(args);
// 获取服务提供方信息,这里既是服务发现的入口,找到一个合适的可用的服务提供方信息
InetSocketAddress address = rpcClient.discovery(request);
System.out.println("[" + Thread.currentThread().getName() + "]discover service:" + address);
// 发起网络请求,得到请求数据
RpcResponse response = rpcClient.invoke(request, address);
return response.getResult();
}
}
上面的InetSocketAddress address = rpcClient.discovery(request)
是相比v1多了一个最重要的东西,每次获取请求后都实时从zk中获取对应的服务提供方信息,这就是服务发现。
实践
public class Client {
public static void main(String[] args) {
RpcClient rpcClient = new RpcClient();
rpcClient.subscribe(Calculate.class);
rpcClient.start();
Calculate<Integer> calculateProxy = rpcClient.getInstance(Calculate.class);
for(int i=0; i< 200; i++) {
new Thread(() -> {
long start = System.currentTimeMillis();
int s1 = new Random().nextInt(100);
int s2 = new Random().nextInt(100);
int s3 = calculateProxy.add(s1, s2);
System.out.println("[" + Thread.currentThread().getName() + "]a: " + s1 + ", b:" + s2 + ", c=" + s3 + ", 耗时:" + (System.currentTimeMillis() - start));
}).start();
}
}
}
客户端开启200个线程后,执行结果是顺利执行的,在服务端开启的接受请求被添加到线程池中,而代码中线程池的任务队列长度是200,可以完全的存储200个线程,但是如果我们把客户端请求量从200个改成500个呢,又会出现什么情况?
服务端
客户端
如上述的图片显示,当请求量打满线程池之后,线程池的拒绝策略就开始生效了,在本代码中是直接调用了close操作,而客户端感知到关闭后也会出现io错误,而正常的请求则顺利执行。其中还有输出discover服务发现了服务提供方的机器信息,这也是符合起初的想法的。
这里一定要加上一些策略以及时关闭无法处理的socket,否则就会出现服务提供方无任何可执行,但是服务调用方却还在等待中,因为socket并没有关闭,从而出现资源被占用了,还不执行相关任务。
提个问题:线程池打满了怎么办?
在本demo中采取了非常粗暴的策略,直接丢弃了无法处理的任务,在实际的线上业务中,可以先加机器以能再最短的时间内恢复线上情况,后期结合业务特点提出针对性的解决方案。如果业务接受一定的延迟,可以考虑接入kafka类似的消息队列(削峰是mq的一大特点);如果对时间要求很高,要么加机器,要么压榨机器性能,可能之前设置的线程池的数量太小,那就需要调节线程池的各个核心数据,修改线程池的任务队列类型也是可以考虑的;此外也有可能是业务耗时太多,无法及时处理完全造成请求堆积导致的,那么就需要考虑业务的同步改异步化。最后线上告警也需要及时完善。
没有绝对的解决方案,只有最合适当下场景的方案,没有银弹,任何不具体结合业务的方案都是扯淡。
总结思考
v2版本相比v1版本修改了整个代码结构,使得结构能够更加明确,引入zookeeper作为服务治理功能,大致介绍了zookeeper的特点以及功能,给服务注册、服务发现、序列化协议等均留下了口子,以便实现自定义的协议,v1的io模型是BIO,v2并没有变化,只是由单线程改造成多线程。
整体而言符合一个简单的rpc框架,依旧还是有很多点可以完善、优化的点,如:
- io模型还是没有替换,后面考虑直接整体接入netty;
- 也不应该每次实时从zk获取节点信息,应该先设置一个本地缓存,再利用zookeeper的watcher功能,开启一个异步线程去监听更新本地缓存,降低和zk交互带来的性能损耗;
- 也没有快速失败、重试的功能,客观情况下存在网络抖动的问题,重试就可以了
- 整体的各种协议约定并没有明确规范,比较混乱
本人微信公众号(搜索jwfy)欢迎关注
微信公众号
网友评论