美文网首页大数据
kylin架构分析-广播变量BroadCaster分析

kylin架构分析-广播变量BroadCaster分析

作者: b00d1f0f0afd | 来源:发表于2018-04-17 18:43 被阅读43次
麒麟出没,必有祥瑞

  BroadCaster类是kylin的节点之间进行通信的基础类,用于在所有的kylin服务器之间广播元数据的更新。本文通过源码解读+方法解析的方式介绍了BroadCaster类。

主要功能

  1. 通过一个map<String,List<Listener>>维护一个监听器集合,key为事件的实体类型,比如 “cube”、“segment”等,value为一个集合,集合内容为一系列的注册了的监听器实体。并通过addListener方法注册监听器给不同的实体,然后 notifyListener 或者 notifyClearAll 来调用,先拿到监听器集合中的监听器列表,并循环调用监听方法达到通知的目的。
  2. 通过一个阻塞队列broadcastEvents存放事件 。
  3. 通过 CacheController的announceWipeCache 方法来宣布一个事件并调用queue方法插入这个broadcastEvents(主要用于给前端清除所有节点缓存用)。
  4. 创建只有一个单独的线程的线程池来执行任务(如果这个线程挂掉,则会重新启动一个线程)。这个线程会一直循环拉取消息队列里的时间,如果没有消息,将会阻塞等待。如果收到消息,会通过循环所有的节点调用RestClient来发送 wipecache 请求,这里就会调用 CacheController的 wipeCache方法,而这个方法,最终会调用上面的 notifyListener方法来达到通知监听的目的。

主要的功能模块图如下所示


BroadCaster介绍

代码解析

  1. getInstance: 静态方法 入口,获取示例 一个config对应一个示例
 public static Broadcaster getInstance(KylinConfig config) {

        synchronized (CACHE) {
           // key为config实例
            Broadcaster r = CACHE.get(config);
            if (r != null) {
                return r;
            }

            r = new Broadcaster(config);
            CACHE.put(config, r);
            if (CACHE.size() > 1) {
                logger.warn("More than one singleton exist");
            }
            return r;
        }
    }
  1. 私有构造函数Broadcaster: 单独线程的线程池 , while(true)无限循环从broadcastEvents队列中取出队列首的广播事件,循环restServers配置,并且调用restClient的wipeCache方法进行缓存清除操作,如果失败,再次放入队列,直至失败次数到达规定的最大次数,通过 kylin.metadata.sync-retries 设置,默认为3.
private Broadcaster(final KylinConfig config) {
        this.config = config;
        final int retryLimitTimes = config.getCacheSyncRetrys();

        final String[] nodes = config.getRestServers();
        if (nodes == null || nodes.length < 1) {
            logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
        }
        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));

        //创建一个单独的线程的线程池来执行任务,如果这个线程挂掉,则会重新启动一个线程
        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
            @Override
            public void run() {
                final Map<String, RestClient> restClientMap = Maps.newHashMap();
                //创建一个线程池,起立缓存线程池
                final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());

                while (true) {
                    try {
                        //循环执行
                        final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
                        //判断重试次数,如果失败再次放入
                        broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1);
                        if (broadcastEvent.getRetryTime() > retryLimitTimes) {
                            logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent);
                            continue;
                        }

                        //根据 rest servers 配置来构建请求客户端
                        String[] restServers = config.getRestServers();
                        logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
                        for (final String node : restServers) {
                            if (restClientMap.containsKey(node) == false) {
                                restClientMap.put(node, new RestClient(node));
                            }
                        }

                        logger.debug("Announcing new broadcast event: " + broadcastEvent);
                        for (final String node : restServers) {
                            wipingCachePool.execute(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        //发送请求,清楚缓存
                                        restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
                                                broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
                                    } catch (IOException e) {
                                        logger.warn("Thread failed during wipe cache at {}, error msg: {}",
                                                broadcastEvent, e);
                                        // when sync failed, put back to queue
                                        try {
                                            //然后在加入到 队列中去
                                            broadcastEvents.putLast(broadcastEvent);
                                        } catch (InterruptedException ex) {
                                            logger.warn(
                                                    "error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ",
                                                    broadcastEvent, ex);
                                        }
                                    }
                                }
                            });
                        }
                    } catch (Exception e) {
                        logger.error("error running wiping", e);
                    }
                }
            }
        });
    }
  1. registerListener: registerStaticListener供其他代码调用,注册不同的监听器 为不同的实体,比如 Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
/**
     * 注册监听器
     * @param lmap 一个lmap  一个以实体entity为键,Listener list为value的map
     * @param listener  一个listener
     * @param entities  实体
     */
    private static void doRegisterListener(Map<String, List<Listener>> lmap, Listener listener, String... entities) {
        synchronized (lmap) {
            // ignore re-registration
            List<Listener> all = lmap.get(SYNC_ALL);
            if (all != null && all.contains(listener)) {
                return;
            }

            for (String entity : entities) {
                if (!StringUtils.isBlank(entity))
                    // 为传入的 所有entitiy 注册 监听
                    addListener(lmap, entity, listener);
            }
            //监听固定的几个类型
            addListener(lmap, SYNC_ALL, listener);
            addListener(lmap, SYNC_PRJ_SCHEMA, listener);
            addListener(lmap, SYNC_PRJ_DATA, listener);
            addListener(lmap, SYNC_PRJ_ACL, listener);
        }
    }
  1. notifyListener:最底层的通知参数
//正式通知方法
    private void notifyListener(String entity, Event event, String cacheKey, boolean includeStatic) throws IOException {
        // prevents concurrent modification exception
        List<Listener> list = Lists.newArrayList();
        List<Listener> l1 = listenerMap.get(entity); // normal listeners first
        if (l1 != null)
            list.addAll(l1);

        //是否包括静态的监听, 如果包含,则一起加进来
        if (includeStatic) {
            List<Listener> l2 = staticListenerMap.get(entity); // static listeners second
            if (l2 != null)
                list.addAll(l2);
        }

        if (list.isEmpty())
            return;

        logger.debug("Broadcasting" + event + ", " + entity + ", " + cacheKey);

        switch (entity) {
        case SYNC_ALL:
            for (Listener l : list) {
                l.onClearAll(this);
            }
            clearCache(); // clear broadcaster too in the end
            break;
        ...//省略一些代码
        default:
            for (Listener l : list) {
                l.onEntityChange(this, entity, event, cacheKey);
            }
            break;
        }

        logger.debug("Done broadcasting" + event + ", " + entity + ", " + cacheKey);
    }
  1. Listener: 内部抽象类,主要方法如下 :
    • onClearAll 清除所有缓存
    • onProjectSchemaChange onProjectDataChange onProjectQueryACLChange 项目相关的,会传入 project作为 cache key,然后清除某一个项目下的缓存。
    • onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) 自定义的时间类型 entity 为不同的类型,event为 CREATE("create"), UPDATE("update"), DROP("drop"); 三种操作类型,cachekey为具体的CacheKey 要操作的具体的值 。
//监听器
    abstract public static class Listener {
        // 清空所有
        public void onClearAll(Broadcaster broadcaster) throws IOException {
        }

        //项目元数据修改事件
        public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
        }

        //项目数据修改事件
        public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException {
        }

        //项目查询权限修改事件
        public void onProjectQueryACLChange(Broadcaster broadcaster, String project) throws IOException {
        }

        //具体的实体修改事件
        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
                throws IOException {
        }
    }

总结

  总的来说,这个BroadCaster使用的是一种经典的生产消费者模型,一头往队列里插入通知消息,另一头通过线程拉取队列并通知Listener。

本文作者: 彭双宝
原文链接: http://blog.lovedata.net/97fa7954.html

相关文章

网友评论

    本文标题:kylin架构分析-广播变量BroadCaster分析

    本文链接:https://www.haomeiwen.com/subject/cuypkftx.html