整体思路
服务端:将key:服务名称,value:ip和端口号注册给zookeeper
客户端:根据服务名称,从zk中获取ip和端口号集合,并做负载均衡
具体代码
服务端
public class ZkConfig {
public static String CONNECTION_STR="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
}
public interface IRegistryCenter {
/**
* 服务注册名称和服务注册地址实现服务的管理
* @param serviceName
* @param serviceAddress
*/
void registry(String serviceName,String serviceAddress);
}
public class RegistryCenterWithZk implements IRegistryCenter{
CuratorFramework curatorFramework =null;
{
//初始化zookeeper的连接, 会话超时时间是5s,衰减重试
curatorFramework = CuratorFrameworkFactory.builder().
connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).
namespace("registry")
.build();
curatorFramework.start();
}
@Override
public void registry(String serviceName, String serviceAddress) {
String servicePath="/"+serviceName;
try {
//判断节点是否存在
if(curatorFramework.checkExists().forPath(servicePath)==null){
curatorFramework.create().creatingParentsIfNeeded().
withMode(CreateMode.PERSISTENT).forPath(servicePath);
}
//serviceAddress: ip:port
String addressPath=servicePath+"/"+serviceAddress;
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(addressPath);
System.out.println("服务注册成功");
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端
public class ZkConfig {
public static String CONNECTION_STR="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
}
public interface IServiceDiscovery {
//根据服务名称返回服务地址
String discovery(String serviceName);
}
public class ServiceDiscoveryWithZk implements IServiceDiscovery{
CuratorFramework curatorFramework =null;
List<String> serviceRepos=new ArrayList<>(); //服务地址的本地缓存
{
//初始化zookeeper的连接, 会话超时时间是5s,衰减重试
curatorFramework = CuratorFrameworkFactory.builder().
connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).
namespace("registry")
.build();
curatorFramework.start();
}
/**
* 服务的查找
* 设置监听
*
* @param serviceName
* @return
*/
@Override
public String discovery(String serviceName) {
//完成了服务地址的查找(服务地址被删除)
String path="/"+serviceName; //registry/com.gupaoedu.demo.HelloService
if(serviceRepos.isEmpty()) {
try {
serviceRepos = curatorFramework.getChildren().forPath(path);
registryWatch(path);
} catch (Exception e) {
e.printStackTrace();
}
}
//针对已有的地址做负载均衡
LoadBalanceStrategy loadBalanceStrategy=new RandomLoadBalance();
return loadBalanceStrategy.selectHost(serviceRepos);
}
private void registryWatch(final String path) throws Exception {
PathChildrenCache nodeCache=new PathChildrenCache(curatorFramework,path,true);
PathChildrenCacheListener nodeCacheListener= (curatorFramework1, pathChildrenCacheEvent) -> {
System.out.println("客户端收到节点变更的事件");
serviceRepos=curatorFramework1.getChildren().forPath(path);// 再次更新本地的缓存地址
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
}
public interface LoadBalanceStrategy {
String selectHost(List<String> repos);
}
public abstract class AbstractLoadBalance implements LoadBalanceStrategy{
@Override
public String selectHost(List<String> repos) {
//repos可能为空, 可能只有一个。
if(repos==null||repos.size()==0){
return null;
}
if(repos.size()==1){
return repos.get(0);
}
return doSelect(repos);
}
protected abstract String doSelect(List<String> repos);
}
public class RandomLoadBalance extends AbstractLoadBalance{
@Override
protected String doSelect(List<String> repos) {
int length=repos.size();
Random random=new Random(); //从repos的集合内容随机获得一个地址
return repos.get(random.nextInt(length));
}
}
网友评论