data:image/s3,"s3://crabby-images/a0ded/a0ded9d12ff8a449eb731016f3d6c6b5c793f4e9" alt=""
BroadCaster类是kylin的节点之间进行通信的基础类,用于在所有的kylin服务器之间广播元数据的更新。本文通过源码解读+方法解析的方式介绍了BroadCaster类。
主要功能
- 通过一个map<String,List<Listener>>维护一个监听器集合,key为事件的实体类型,比如 “cube”、“segment”等,value为一个集合,集合内容为一系列的注册了的监听器实体。并通过addListener方法注册监听器给不同的实体,然后 notifyListener 或者 notifyClearAll 来调用,先拿到监听器集合中的监听器列表,并循环调用监听方法达到通知的目的。
- 通过一个阻塞队列broadcastEvents存放事件 。
- 通过 CacheController的announceWipeCache 方法来宣布一个事件并调用queue方法插入这个broadcastEvents(主要用于给前端清除所有节点缓存用)。
- 创建只有一个单独的线程的线程池来执行任务(如果这个线程挂掉,则会重新启动一个线程)。这个线程会一直循环拉取消息队列里的时间,如果没有消息,将会阻塞等待。如果收到消息,会通过循环所有的节点调用RestClient来发送 wipecache 请求,这里就会调用 CacheController的 wipeCache方法,而这个方法,最终会调用上面的 notifyListener方法来达到通知监听的目的。
主要的功能模块图如下所示
data:image/s3,"s3://crabby-images/06803/06803f1e2ade8a859737fc337c8b71c875a7cbfb" alt=""
代码解析
- getInstance: 静态方法 入口,获取示例 一个config对应一个示例
public static Broadcaster getInstance(KylinConfig config) {
synchronized (CACHE) {
// key为config实例
Broadcaster r = CACHE.get(config);
if (r != null) {
return r;
}
r = new Broadcaster(config);
CACHE.put(config, r);
if (CACHE.size() > 1) {
logger.warn("More than one singleton exist");
}
return r;
}
}
- 私有构造函数Broadcaster: 单独线程的线程池 , while(true)无限循环从broadcastEvents队列中取出队列首的广播事件,循环restServers配置,并且调用restClient的wipeCache方法进行缓存清除操作,如果失败,再次放入队列,直至失败次数到达规定的最大次数,通过 kylin.metadata.sync-retries 设置,默认为3.
private Broadcaster(final KylinConfig config) {
this.config = config;
final int retryLimitTimes = config.getCacheSyncRetrys();
final String[] nodes = config.getRestServers();
if (nodes == null || nodes.length < 1) {
logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
}
logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
//创建一个单独的线程的线程池来执行任务,如果这个线程挂掉,则会重新启动一个线程
Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
@Override
public void run() {
final Map<String, RestClient> restClientMap = Maps.newHashMap();
//创建一个线程池,起立缓存线程池
final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
while (true) {
try {
//循环执行
final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
//判断重试次数,如果失败再次放入
broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1);
if (broadcastEvent.getRetryTime() > retryLimitTimes) {
logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent);
continue;
}
//根据 rest servers 配置来构建请求客户端
String[] restServers = config.getRestServers();
logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
for (final String node : restServers) {
if (restClientMap.containsKey(node) == false) {
restClientMap.put(node, new RestClient(node));
}
}
logger.debug("Announcing new broadcast event: " + broadcastEvent);
for (final String node : restServers) {
wipingCachePool.execute(new Runnable() {
@Override
public void run() {
try {
//发送请求,清楚缓存
restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
} catch (IOException e) {
logger.warn("Thread failed during wipe cache at {}, error msg: {}",
broadcastEvent, e);
// when sync failed, put back to queue
try {
//然后在加入到 队列中去
broadcastEvents.putLast(broadcastEvent);
} catch (InterruptedException ex) {
logger.warn(
"error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ",
broadcastEvent, ex);
}
}
}
});
}
} catch (Exception e) {
logger.error("error running wiping", e);
}
}
}
});
}
- registerListener: registerStaticListener供其他代码调用,注册不同的监听器 为不同的实体,比如 Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
/**
* 注册监听器
* @param lmap 一个lmap 一个以实体entity为键,Listener list为value的map
* @param listener 一个listener
* @param entities 实体
*/
private static void doRegisterListener(Map<String, List<Listener>> lmap, Listener listener, String... entities) {
synchronized (lmap) {
// ignore re-registration
List<Listener> all = lmap.get(SYNC_ALL);
if (all != null && all.contains(listener)) {
return;
}
for (String entity : entities) {
if (!StringUtils.isBlank(entity))
// 为传入的 所有entitiy 注册 监听
addListener(lmap, entity, listener);
}
//监听固定的几个类型
addListener(lmap, SYNC_ALL, listener);
addListener(lmap, SYNC_PRJ_SCHEMA, listener);
addListener(lmap, SYNC_PRJ_DATA, listener);
addListener(lmap, SYNC_PRJ_ACL, listener);
}
}
- notifyListener:最底层的通知参数
//正式通知方法
private void notifyListener(String entity, Event event, String cacheKey, boolean includeStatic) throws IOException {
// prevents concurrent modification exception
List<Listener> list = Lists.newArrayList();
List<Listener> l1 = listenerMap.get(entity); // normal listeners first
if (l1 != null)
list.addAll(l1);
//是否包括静态的监听, 如果包含,则一起加进来
if (includeStatic) {
List<Listener> l2 = staticListenerMap.get(entity); // static listeners second
if (l2 != null)
list.addAll(l2);
}
if (list.isEmpty())
return;
logger.debug("Broadcasting" + event + ", " + entity + ", " + cacheKey);
switch (entity) {
case SYNC_ALL:
for (Listener l : list) {
l.onClearAll(this);
}
clearCache(); // clear broadcaster too in the end
break;
...//省略一些代码
default:
for (Listener l : list) {
l.onEntityChange(this, entity, event, cacheKey);
}
break;
}
logger.debug("Done broadcasting" + event + ", " + entity + ", " + cacheKey);
}
- Listener: 内部抽象类,主要方法如下 :
- onClearAll 清除所有缓存
- onProjectSchemaChange onProjectDataChange onProjectQueryACLChange 项目相关的,会传入 project作为 cache key,然后清除某一个项目下的缓存。
- onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) 自定义的时间类型 entity 为不同的类型,event为 CREATE("create"), UPDATE("update"), DROP("drop"); 三种操作类型,cachekey为具体的CacheKey 要操作的具体的值 。
//监听器
abstract public static class Listener {
// 清空所有
public void onClearAll(Broadcaster broadcaster) throws IOException {
}
//项目元数据修改事件
public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
}
//项目数据修改事件
public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
}
//项目查询权限修改事件
public void onProjectQueryACLChange(Broadcaster broadcaster, String project) throws IOException {
}
//具体的实体修改事件
public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
throws IOException {
}
}
总结
总的来说,这个BroadCaster使用的是一种经典的生产消费者模型,一头往队列里插入通知消息,另一头通过线程拉取队列并通知Listener。
本文作者: 彭双宝
原文链接: http://blog.lovedata.net/97fa7954.html
网友评论