美文网首页
数据大盘-搜索词统计(KeyedProcessFunction)

数据大盘-搜索词统计(KeyedProcessFunction)

作者: LZhan | 来源:发表于2019-11-12 15:48 被阅读0次
    0.需求

    实时输出当天的搜索词排名,即实时呈现热点搜索词

    1.数据源读取

    对接Kafka数据源,将消息转成实体类(实体类属性主要是关键词和搜索时间)

    public class KafkaUtil {
    
        public static FlinkKafkaConsumerBase<String> text(String topic) throws IOException {
            return text(topic, "kafka.properties");
        }
    
        /**
         * @param topic 主题名称
         * @param configPath Kafka属性配置文件路径
         */
        public static FlinkKafkaConsumerBase<String> text(String topic,String configPath) throws IOException {
    
            //1.加载Kafka属性
            Properties prop=new Properties();
            //Class.getClassLoader.getResourceAsStream 默认是从ClassPath根下获取,path不能以“/"开头
            InputStream in=KafkaUtil.class.getClassLoader().getResourceAsStream(configPath);
            prop.load(in);
            //2.构造FlinkKafkaConsumer
            FlinkKafkaConsumerBase<String> consumer=new FlinkKafkaConsumer011<>(topic,new SimpleStringSchema(),prop);
            //todo 可以进行消费者的相关配置
            // 本地debug不提交offset consumer.setCommitOffsetsOnCheckpoints(false);
            return consumer;
        }
    }
    

    env.addSource(KafkaUtil.text("topic"));

    2.搜索词统计

    在进行窗口内统计时,首先需要根据yyyy-MM-dd的维度对搜索词消息进行keyby操作,形成KeyedStream,进一步调用实现了抽象类KeyedProcessFunction的方法。

    2.1 KeyedProcessFunction介绍

    顾名思义,KeyedProcessFunction,是针对具有相同key的stream进行元素处理的方法。
    public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
    三个泛型分别是key的类型,流输入类型,流输出类型。

    <1> 方法public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    处理流中的每一个元素,即每有一个元素进来,就会执行一次processElement方法。
    该方法可以通过参数Collector<O> out来产生0个或者多个元素;
    也可以通过Context ctx来更新state或者设置定时器timers。
    ctx通过调用timerService()可以注册定时器。

    Context的属性方法:

    public abstract class Context {
    
            /**
             * Timestamp of the element currently being processed or timestamp of a firing timer.
             *
             * <p>This might be {@code null}, for example if the time characteristic of your program
             * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
             */
            public abstract Long timestamp();
    
            /**
             * A {@link TimerService} for querying time and registering timers.
             */
            public abstract TimerService timerService();
    
            /**
             * Emits a record to the side output identified by the {@link OutputTag}.
             *
             * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
             * @param value The record to emit.
             */
            public abstract <X> void output(OutputTag<X> outputTag, X value);
    
            /**
             * Get key of the element being processed.
             */
            public abstract K getCurrentKey();
        }
    

    对于timestamp()方法,需要注意的是注释上说明可能会返回为null,如果设置的时间类型是ProcessingTime。

    <2>方法public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    当通过某个定时器timer通过TimerService设置后,会调用onTimer方法。

    2.2 具体实现

    因为需要实时统计当天的搜索词热度,所以在KeyedProcessFunction实现方法中,需要使用到状态。
    private MapState<String, DashboardKeyword> keywordState;
    Map的键就是搜索词,值就是输出实体类(两个属性分别是关键词和搜索次数)

    @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
    
            StateTtlConfig retainOneDay = StateTtlUtil.retainOneDay();
            MapStateDescriptor<String, DashboardKeyword> keywordStateDescriptor = new MapStateDescriptor<>(
                    "keyword",
                    Types.STRING,
                    Types.POJO(DashboardKeyword.class)
            );
            keywordStateDescriptor.enableTimeToLive(retainOneDay);
    
            this.keywordState = getRuntimeContext().getMapState(keywordStateDescriptor);
        }
    

    KeyedProcessFunction继承了AbstractRichFunction,因此可以使用到RuntimeContext。也进一步可以使用到状态,在open方法里面定义State并且设置State的存活时间为1天。


    @Override
        public void processElement(Search search, Context context, Collector<Tuple2<Integer, DashboardKeyword>> collector) throws Exception {
            Result result = DicAnalysis.parse(search.getKeyword());
            for (Term term: result.getTerms()) {
                String key = term.getName();
                if (key.trim().length() < 2) continue;
    
                DashboardKeyword value;
                if (this.keywordState.get(key) != null) {
                    value = this.keywordState.get(key);
                } else {
                    value = new DashboardKeyword();
                    value.setKeyword(key);
                    value.setFrequency(0L);
                }
                value.setFrequency(value.getFrequency() + 1L);
                this.keywordState.put(key, value);
            }
    
            long coalescedTime = ((System.currentTimeMillis() + 5000) / 5000) * 5000;
            context.timerService().registerProcessingTimeTimer(coalescedTime);
        }
    

    processElement里面的代码就是很常见的套路,如果MapState中已经存在当前搜索词了,即获取对应的value,并将其频次增加;如果MapState中并没有当前的搜索词,则将该关键词及对应value添加到Map当中。
    这里是给定时器添加5s的时长,这里的写法((System.currentTimeMillis() + 5000) / 5000) * 5000是计时器合并的目的,Flink对于每个键和时间戳都只会维护一个计时器(计时器太多会影响性能),需要通过降低计时器的精度来合并计时器,从而减少计时器的数量。
    假设现在是15s,那么定时器为20s,利用上面的写法,16s,17s,18s,19s的定时器都是20s。


     @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Integer, DashboardKeyword>> collector) throws Exception {
            java.util.stream.Collector<DashboardKeyword, ?, List<DashboardKeyword>> top50Collector = Comparators.greatest(
                    50,
                    Comparator.comparingLong(DashboardKeyword::getFrequency)
            );
            List<DashboardKeyword> top50 = Streams.stream(this.keywordState.values()).collect(top50Collector);
    
            for (int rank = 0; rank < top50.size(); rank++) {
                collector.collect(Tuple2.of(rank, top50.get(rank)));
            }
        }
    

    统计前50搜索词。

    3.数据存储

    存储到redis中,
    注意flink1.7版本之后,官方没有redis sink,可以去http://bahir.apache.org/(flink以及spark扩展库),
    粘贴源码实现redis sink。

    相关文章

      网友评论

          本文标题:数据大盘-搜索词统计(KeyedProcessFunction)

          本文链接:https://www.haomeiwen.com/subject/cnhfictx.html