- 实现一个 rpc 框架:
可参考(https://github.com/Snailclimb/guide-rpc-framework)
- 目标 定义 HelloService, client 调用 helloService.sayHello()的时候,可以完全屏蔽 远端服务的调用的细节。
实现服务的远程动态代理:
helloService 有一个代理实现,这个代理可以是 http,tcp, mq 怎么样都可以,这些细节对服务调研都是透明的。
代理的实现与springboot结合,可以自己定义一套 annotation,然后在BeanPostProcessor 的方法里面,对 bean做一个动态增强。
比如,如果一个bean里面引用RpcReference,就可以给他做一个动态代理,然后再把属性set回去。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = bean.getClass();
Field[] declaredFields = targetClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
if (rpcReference != null) {
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group(rpcReference.group()).version(rpcReference.version()).build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceProperties);
Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
declaredField.set(bean, clientProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
对于helloService 他的动态代理,就是RpcClientProxy, 当调用hello方法的时候,实际调用的是proxy的invoke方法
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceProperties.getGroup())
.version(rpcServiceProperties.getVersion())
.build();
RpcResponse<Object> rpcResponse = null;
if (clientTransport instanceof NettyClientTransport) {
CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) clientTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();
}
if (clientTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) clientTransport.sendRpcRequest(rpcRequest);
}
RpcMessageChecker.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
}
真正的实现细节是在:clientTransport.sendRpcRequest
clientTransport是一个接口,可以用不同的协议来实现,这里以netty作为说明
@Override
public CompletableFuture<RpcResponse<Object>> sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
// get server address related channel
Channel channel = channelProvider.get(inetSocketAddress);
if (channel != null && channel.isActive()) {
// put unprocessed request
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcRequest);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
});
} else {
throw new IllegalStateException();
}
return resultFuture;
}
clientTransport 包含了 channelprovider,serviceProvider,并且是一个异步的调用,返回Future。
serviceDiscovery 负责重zk中查询,相应的服务,然后重可用服务列表里面选择一个可用,选择策略可以使自定义的。
channelProvider.get(inetSocketAddress) 则是从 nettyClient里面获取一个连接.也是使用回调的异步通知方式
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}
主要的client 配置如下:
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new DefaultDecoder(RpcResponse.class));
// ByteBuf -> RpcRequest
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyClientHandler());
一个是 KryoSeriablizer,一个是NettyClientHandler。
这两个比较简单,一个是java独有的序列化,一个是读取 服务端返回的 resp。
到这里基于java动态代理的 client sendRpcRequest 模式就处理完毕。
再看下Server端,Server端,其实就相对来说比较简单,主要的工作就是每启动一个服务,就注册起来,以便服务发现可以获得对应的服务。
public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
serviceProvider.publishService(service, rpcServiceProperties);
}
@SneakyThrows
public void start() {
CustomShutdownHook.getCustomShutdownHook().clearAll();
String host = InetAddress.getLocalHost().getHostAddress();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new DefaultDecoder(RpcRequest.class));
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyServerHandler());
}
});
主要的工作就是定义RequestHandler,去处理不同客户端的请求。
public Object handle(RpcRequest rpcRequest) {
Object service = serviceProvider.getService(rpcRequest.toRpcProperties());
return invokeTargetMethod(rpcRequest, service);
}
/**
* get method execution results
*
* @param rpcRequest client request
* @param service service object
* @return the result of the target method execution
*/
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
Object result;
try {
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
throw new RpcException(e.getMessage(), e);
}
return result;
}
request handler 就是根据request 从服务列表里面获得服务的对象,然后反射调用对应的方法。
高并发和性能
进一步处理高并发的问题,比如 有1000 qps 调用 hellowService,那么就要做scala的扩容。
服务端可以启动新的实例动态扩容,加入到zookeeper列表里面。 client端,需要订阅服务上线下线的事件,这样可以动态更新服务列表
而不用每次请求的时候,都去查询zookeeper。
其他优化层面,就是在Netty 上做优化,比如配置 boss work 的数目等。 序列化kyro,protobuf 等的选择。
fallback,熔断降级
熔断,比如网络问题,可能一次要等待一个timeout,如果大量的timeout可能会有问题,框架要识别这种情况,使得快速失败。
降级,可以认为的定义一些场景,比如 物理资源,网络资源,异常的时候优先给哪些服务提供资源,而另外一些服务则调用一些mock的服务,
这个可能要看一个完整的rpc框架,比如dubbo。
本文先理清楚整个 rpc 框架要处理的细节。
网友评论