本人微信公众号(jwfy)欢迎关注
手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理
3、手写RPC框架(3)-引入Hessian序列化工具
原本打算写Netty替换BIO模型的,但是在写Netty中关于Channel长链接部分时,再回顾服务治理这一块的代码实在不太好(当然不意味现在的就很好),决定重写服务治理这一块的逻辑,最后验证1000个线程的执行效果,如下是涉及到改动的点:
- 添加了logback日志模块,便于日志输出
- 拆分了之前的服务治理,形成了服务发现和服务注册两个模块
- 之前服务发现是细化到方法层面,现在修改成接口层面
- 新增了zk节点监听模式,实现异步动态修改节点信息
- netty的channel长链接模式
- 负载均衡由之前的服务提供方ip的,改成netty的channel
在笔记中也会尽可能的阐述自己的设计思路,思路&实现都可能不那么完美,但这也是手写RPC的目的所在。当这个问题抛给你时,你是如何思考的,落实到具体代码实现,又有多少需要去妥协的
看各种框架源码也是如此,不能为了看源码而看源码,看源码一方面能帮助我们解决实际的框架使用问题,另一方面是学习大佬的设计思路&好的架构经验,以能为我们所学习和使用
增加logback模块
使用SLF4J标准规范,再关联绑定使用了logback,因为在com.101tec#zkclient#0.11
版本的zk中包含了log4j,为了避免冲突,所以要么移除,要么采用maven的最近原则,屏蔽zk中的日志模块,本文中则是采用了最近原则的做法,把logback放在pom文件的最顶部,如下图就是启动后的日志输出了(别忘记了往.gitignore中添加logs文件夹)
服务注册
服务注册之前是会注册服务提供方和服务调用方两者,以provider和consumer区分,但是这个新的改进中去掉了该部分逻辑,服务提供方只进行服务注册,服务调用方只进行服务发现,所以这也算一个妥协的点吧。
image如上图是改写之后的zk节点信息,最下面的节点路径就是存在的ip数据,是临时节点,接下来看看服务注册这一块是如何实现的。
public interface ServiceRegister {
/**
* 服务注册
* @param config
*/
void register(BasicConfig config);
/**
* 优雅关闭
*/
void close();
}
保留了之前的注册接口,以便于能做到协议扩展,同时加上了close关闭操作。
public class ZkServiceRegister implements ServiceRegister {
private static final Logger logger = LoggerFactory.getLogger(ZkServiceRegister.class);
private CuratorFramework client;
private RegisterConfig registerConfig;
public ZkServiceRegister(RegisterConfig registerConfig) {
this.registerConfig = registerConfig;
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory
.builder()
.connectString(registerConfig.getZkHost())
.sessionTimeoutMs(registerConfig.getSessionTimeOut())
.retryPolicy(policy)
.namespace(registerConfig.getZkNameSpace())
.build();
// 业务的根路径是 /jwfy/simple-rpc ,其他的都会默认挂载在这里
this.client.start();
logger.info("zk启动正常");
}
@Override
public void register(BasicConfig config) {
String interfacePath = "/" + config.getInterfaceName();
try {
if (this.client.checkExists().forPath(interfacePath) == null) {
// 创建 服务的永久节点
this.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(interfacePath);
}
String address = getServiceAddress(config);
String path = String.format("%s/%s/%s", interfacePath, ServiceType.PROVIDER.getType(), address);
// 这里强制采用了ServiceType.PROVIDER.getType(),也就是provider
logger.info("注册 zk path: [" + this.registerConfig.getZkNameSpace() + path + "]");
this.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "0".getBytes());
// 创建临时节点,节点内的数据是0
} catch (Exception e) {
// 如果节点session未过期,还未被清除,再次创建则会提示节点已存在
logger.error("注册zk失败, [{}]:{}", interfacePath, e.getMessage());
}
}
@Override
public void close() {
this.client.close();
logger.warn("zkClient关闭");
}
private String getServiceAddress(BasicConfig config) {
return new StringBuilder().append(config.getHost()).append(":").append(config.getPort()).toString();
}
}
相比上一个版本,代码精简了不少,只是针对服务提供方的接口进行了一个zk节点的注册,此外添加了close操作,以便于在服务停止时,主动关闭zk节点的链接。
服务发现
public interface ServiceDiscovery {
/**
* 获取服务的ip信息并添加zk监听
* @param interfaceName
* @return
*/
List<String> get(String interfaceName);
}
原本这部分功能是在服务注册中的,现在移出来了,单独弄成一个接口ServiceDiscovery,同样是为了协议拓展的功能。
public class ZkServiceDiscovery implements ServiceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(ZkServiceDiscovery.class);
private CuratorFramework client;
private Map<String, List<String>> servicePathMap = new ConcurrentHashMap<>();
// 存储的是interface->ip信息的映射关系
public ZkServiceDiscovery(RegisterConfig registerConfig) {
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory
.builder()
.connectString(registerConfig.getZkHost())
.sessionTimeoutMs(registerConfig.getSessionTimeOut())
.retryPolicy(policy)
.namespace(registerConfig.getZkNameSpace())
.build();
this.client.start();
logger.info("zk启动正常");
}
@Override
public List<String> get(String interfaceName){
String path = String.format("/%s/%s", interfaceName, ServiceType.PROVIDER.getType());
List<String> ips = null;
try {
ips = this.client.getChildren().forPath(path);
// 先获取子节点的所有信息
this.addWatcher(interfaceName, path);
// 添加监听模式
servicePathMap.put(path, ips);
} catch (Exception e) {
}
return ips;
}
private void addWatcher(String interfaceName, String path) throws Exception {
PathChildrenCache cache = new PathChildrenCache(this.client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (CHILD_ADDED == event.getType() || CHILD_REMOVED == event.getType()) {
// 节点添加和节点移除操作
String[] childPath = event.getData().getPath().split("/");
String ip = childPath[childPath.length-1];
PathChildrenCacheEvent.Type type = event.getType();
logger.info("path:[{}],ip:[{}], type:{}", path, ip, type.toString());
List<String> stringList = servicePathMap.get(path);
if (stringList == null) {
stringList = new ArrayList<>();
servicePathMap.put(path, stringList);
}
if (CHILD_ADDED == type && !stringList.contains(ip)) {
stringList.add(ip);
ClientConnection.getInstance().connection(interfaceName, ip);
// 这里有使用ClientConnection(服务连接器)去完成通知操作
} else if (CHILD_REMOVED == type) {
stringList.remove(ip);
ClientConnection.getInstance().remove(interfaceName, ip);
}
}
}
});
}
}
通过zk获取对应服务的服务提供方信息的同时,添加了watcher模式以能实时感知zk节点的变化,然后把其变化信息告诉给服务调用方连接器
服务连接器
管理zk节点和netty可用channel的连接器,负载均衡也是在此处发挥作用,客户端可通过此连接器完成有效的Channel获取
public class ClientConnection {
private static final Logger logger = LoggerFactory.getLogger(ClientConnection.class);
private SerializeProtocol serializeProtocol = new HessianSerialize();
private Map<String, CopyOnWriteArrayList<IpClientHandler>> clientHandlerMap = new ConcurrentHashMap<>();
private CopyOnWriteArrayList<ClientHandler> clientHandlerList = new CopyOnWriteArrayList<>();
private volatile boolean flag = true;
private ReentrantLock reentrantLock = new ReentrantLock();
/**
* 一个开关以控制对cliendhandler对获取
*/
private Condition condition = reentrantLock.newCondition();
private static class Single {
private static final ClientConnection INSTANCE = new ClientConnection();
}
public static ClientConnection getInstance() {
// 内部类的单例模式,线程安全
return Single.INSTANCE;
}
public void connection(String interfaceName, String ip) {
InetSocketAddress address = CommonUtils.parseIp(ip);
EventLoopGroup work = new NioEventLoopGroup();
// netty的客户端连接操作
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LengthFieldPrepender(2,0,false))
.addLast(new RpcEncoder(RpcRequest.class, serializeProtocol))
.addLast(new RpcDecoder(RpcResponse.class, serializeProtocol))
.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(address);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
ClientHandler handler = channelFuture.channel().pipeline().get(ClientHandler.class);
InetSocketAddress remoteAddress = (InetSocketAddress) handler.getChannel().remoteAddress();
logger.info("链接到远程服务器:{}, address:{}", handler, remoteAddress);
CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);
if (ips == null) {
ips = new CopyOnWriteArrayList<IpClientHandler>();
clientHandlerMap.put(interfaceName, ips);
}
ips.add(new IpClientHandler(ip, handler));
clientHandlerList.add(handler);
// 通知其他可能被阻塞的线程
notifyHandler();
}
}
});
}
public void remove(String interfaceName, String ip) {
// 移除掉已经废弃的不可用的channel信息
CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);
if (ips != null && !ips.isEmpty()) {
ips.stream().filter(x -> {
return x.getIp().equals(ip);
}).findFirst().ifPresent(x -> {
ips.remove(x);
clientHandlerList.remove(x.getHandler());
});
}
}
public ClientHandler getHandler(String interfaceName) {
CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);
int size = ips.size();
while (flag && size <=0) {
// 线程等待
try {
awaitHandler();
} catch (InterruptedException e) {
throw new RuntimeException("无法获取有效的handler");
}
ips = clientHandlerMap.get(interfaceName);
size = ips.size();
}
if (!flag) {
return null;
}
if (size == 1) {
return ips.get(0).getHandler();
}
Random random = new Random();
return ips.get(random.nextInt(size)).getHandler();
}
private void awaitHandler() throws InterruptedException {
this.reentrantLock.lock();
try {
this.condition.await(2000, TimeUnit.MILLISECONDS);
// 等待有有效的handler,不过也是有时间的,避免长时间等待
} finally {
this.reentrantLock.unlock();
}
}
private void notifyHandler() {
this.reentrantLock.lock();
try {
this.condition.signalAll();
} finally {
this.reentrantLock.unlock();
}
}
public void close() {
// 关闭,退出可能的条件判断、存储的channel进行主动关闭操作
this.flag = true;
this.clientHandlerList.forEach(x -> {
x.getChannel().close();
});
this.clientHandlerList.clear();
this.clientHandlerMap.clear();
logger.warn("Netty服务端关闭了");
}
}
这短代码稍微比较长,而且也包含了netty客户端连接的逻辑(暂时可以不用关心netty连接的逻辑)。
那么为什么会在getHandler中添加条件判断呢?是因为netty连接有效的channel处理器是异步完成的,所以是在operationComplete异步回调方法中去通知的,确保可以获取到有效channel,而同时为了避免长时间的阻塞等待,故使用了await 2s的时间。
负载均衡也是比较简单的,如果确实没有数据则直接返回null,当有效的数据只有1个时,也没必要再做负载均衡,只有当数据超过1条时,需要进行选择操作。
实践
设置了两个服务提供方,一个服务调用方,按照先启动一个服务提供方、再启动服务调用方、最后启动另一个服务调用方的执行顺序
-
服务提供方1启动
image
-
服务调用方启动
image
-
服务提供方2启动
- 服务提供方1关闭
日志显示的也很明显,watcher监听到了节点的移除事件,然后进行关闭channel长连接的操作
- 客户端退出
- 负载均衡开启1000个线程
负载均衡
服务调用方
服务提供方
负载均衡在两个channel中随机轮询,开启的1000个线程调用没什么问题,不过发现了个关于netty方面的问题,这期就不再介绍,后面介绍netty的时候再聊。
总结
本期针对服务治理的重写改善了之前每次获取远程ip都需要通过zookeeper获取的方式,降低对zookeeper带来压力,采用zookeeper的watcher模式感知到节点的变化,同时本地缓存的也不是ip节点数据,而是长连接channel数据,避免了每次获取ip数据后都需要进行连接和关闭操作,可以进一步的提高效率。最后新加了优雅关闭的操作。
之前BIO模式采用线程池的方式,1000个线程很快就会把线程池打满,导致后续的任务全部被拒绝,如果在拒绝任务没有处理好还会导致服务假死的情况,而采用netty,并没有出现线程池被打满的情况,采用react模式确实能够很好的处理网络连接的处理逻辑,并且封装了NIO部分的操作。限于篇幅的缘故,netty就还是留到后面再介绍。
如有想需要demo代码的可关注本人微信公众号,给我发私信,代码存在的问题也欢迎提出~
本人微信公众号(搜索jwfy)欢迎关注
微信公众号
网友评论