美文网首页
深入理解kylin集成kafka

深入理解kylin集成kafka

作者: 起个什么呢称呢 | 来源:发表于2019-01-31 16:13 被阅读63次

    在写正文之前先讲下什么是kafka

    kafka

    kafka是一个消息系统 再具体点是发布-订阅制的消息系统,它依赖于zookeeper。也可以把他看作一个数据池。
    kafka有这么几个概念

    • 生产者
      顾名思义是指数据的生产者
    • 消费者
      同理,消费者是指数据的消费者
    • 代理
      代理是负责维护发布数据的简单系统。每个代理中的每个主题可以具有零个或多个分区。
    • 主题
      主题属于特定类别的消息流称为主题。 数据存储在主题中。主题被拆分成分区。 对于每个主题,Kafka保存一个分区的数据。 每个这样的分区包含不可变有序序列的消息。 分区被实现为具有相等大小的一组分段文件
    • 分区
      主题可能有许多分区,因此它可以处理任意数量的数据。
      举个例子,把kafka看作一个大型物流中转站,所有卖家的商品(生产者生产的数据)都会输入到这里。那么这个中转站(kafka)就会对物品进行分类处理(主题处理),分为,医药,家电,办公,建材之类的。又因为距离,效率等问题,不便于及时服务好客户,于是便设立代理,紧接着同类物品分发到各个代理,所以就会有分区概念。最后物品到达消费者手里。
      中间还会设计到leader和follower,leader以看作快递员中的Boss,它负责给指定分区读取写入,那么剩下的快递员都是follower,当leader挂掉,剩下的follower会重新选取一个新的leader负责之前的工作,这样就不会出现因为服务挂掉,导致数据丢失问题。


      image.png

    kylin集成kafka

    先看下开源kylin的源码目录结构


    image.png

    config目录下的都是写配置类,包括id,host,port,cluster等目的是告诉程序去哪里取数据,然后把数据存储到哪里。
    hadoop 目录下类 如下:


    image.png
    到这里发现kylin的UI界面提供的一些配置参数,其实都是配置文件的参数,只不过把做了前后端隔离,填写的数据被调用,组成一个job被挂起
    如果一个job一致挂起,然后cube会有很多的segment,所以需要合并,合并就用到了MergeOffsetStep类,合并目的也是为了减少存储空间。达到优化cube的目的
    package org.apache.kylin.source.kafka.job;
    
    import java.io.IOException;
    import java.util.Collections;
    
    import org.apache.kylin.cube.CubeInstance;
    import org.apache.kylin.cube.CubeManager;
    import org.apache.kylin.cube.CubeSegment;
    import org.apache.kylin.cube.CubeUpdate;
    import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
    import org.apache.kylin.job.exception.ExecuteException;
    import org.apache.kylin.job.execution.AbstractExecutable;
    import org.apache.kylin.job.execution.ExecutableContext;
    import org.apache.kylin.job.execution.ExecuteResult;
    import org.apache.kylin.metadata.model.SegmentRange;
    import org.apache.kylin.metadata.model.SegmentRange.TSRange;
    import org.apache.kylin.metadata.model.Segments;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.base.Preconditions;
    
    /**
     */
    public class MergeOffsetStep extends AbstractExecutable {
    
        private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
    
        public MergeOffsetStep() {
            super();
        }
    
        @Override
        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
            final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
            final CubeInstance cubeCopy = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
            final String segmentId = CubingExecutableUtil.getSegmentId(this.getParams());
            final CubeSegment segCopy = cubeCopy.getSegmentById(segmentId);
    
            Preconditions.checkNotNull(segCopy, "Cube segment '" + segmentId + "' not found.");
            Segments<CubeSegment> mergingSegs = cubeCopy.getMergingSegments(segCopy);
    
            Preconditions.checkArgument(mergingSegs.size() > 0, "Merging segment not exist.");
    
            Collections.sort(mergingSegs);
            final CubeSegment first = mergingSegs.get(0);
            final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
    
            segCopy.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
            segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
            segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
    
            segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd()));
    
            CubeUpdate update = new CubeUpdate(cubeCopy);
            update.setToUpdateSegs(segCopy);
            try {
                cubeManager.updateCube(update);
                return ExecuteResult.createSucceed();
            } catch (IOException e) {
                logger.error("fail to update cube segment offset", e);
                return ExecuteResult.createError(e);
            }
        }
    
    }
    
    

    总的来说集成的过程大概是这样的。
    1.按照一定的格式生成数据
    2.按照一定格式解析读取数据
    3.将数据分发到Hadoop上
    4.事实表load新增数据
    5.建立cube
    6.cube 合并更新操作

    相关文章

      网友评论

          本文标题:深入理解kylin集成kafka

          本文链接:https://www.haomeiwen.com/subject/jhiksqtx.html