美文网首页
解决DStream的mapWithState无效key清除的问题

解决DStream的mapWithState无效key清除的问题

作者: 机灵鬼鬼 | 来源:发表于2019-09-28 16:11 被阅读0次

    该实例使用spark streaming从kafka中实时分析当天加减购的次数排名top5的用户。
    实现思路就是通过reduceByKey和mapWithState结合完成数据的统计。在统计过程中遇到的问题可能大家都会遇到:
    问题1:当跨天的时候,mapWithState中的缓存会存储以前计算过的过期的key,比如我们今天是10月26日,按照我们的思路只需要对10月26日发生的加购行为的用户做累加统计,10月25日及其以前key的就不要了。但是实际情况并不是这样,他依然会被mapWithState存储在缓存中,除非你重启你的程序。那么问题来了,我们怎么把已经计算过的10月25日及其以前的key从mapWithState缓存中去除呢?
    解决方案:
    第一步、这时候我们就用了State的超时机制,为每个key都固定设置24小时的存活时间,过期后会自动清除。这样保证sparkStreaming内在机制可以自动清理掉24小时以外的过期key。但这种情况依然会存在两天的数据key的情况,比如10月25日15点产生的key,必须等到10月26日15点才能被销毁,所以就需要第二步屏蔽操作。
    第二步、从分析程序用日期屏蔽除了10月26日以外的key。
    问题2:相同的key为什么会出现多次,好像并没有合并同类项?
    解决方案:这是因为mapWithState是并没有合并的机制的,这一点不同于updateBykey可以自动reduceBykey。所以我们在使用mapWithState来进行词频统计的时候,要先进行reduceBykey才能避免相同的key重复出现的情况。
    查看Spark Streaming消费消息的情况的工具
    ${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server hanode5:9092 --describe --group jis


    image.png
    package com.lppz.busi;
    
    import com.lppz.common.DBUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.State;
    import org.apache.spark.streaming.StateSpec;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import scala.None;
    import scala.Tuple2;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    public class DayUpdateCartCntTopN {
        static String flag1="更新购物车接口,uid:";
        static String flag2="返回给前端的购物车信息cartInfoMap:";
        static String addSql="insert into bigdata_update_cart_cnt(uid, up_date, cnt) values(?,?,?)";
        static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
        static int setp=300;
        public static void main(String[] args) {
            ResourceBundle kafkaData = ResourceBundle.getBundle("META-INF/kafka");
    
            SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("DayUpdateCartCntTopN");
            //控制sparkstreaming启动时,积压问题并设置背压机制,自适应批次的record变化,来控制任务的堆积
            //(1)确保在kill任务时,能够处理完最后一批数据,再关闭程序,不会发生强制kill导致数据处理中断,没处理完的数据丢失
            sc.set("spark.streaming.stopGracefullyOnShutdown", "true");
            //(2)开启后spark自动根据系统负载选择最优消费速率
            sc.set("spark.streaming.backpressure.enabled", "true");
            //(3)开启的情况下,限制第一次批处理应该消费的数据,因为程序冷启动 队列里面有大量积压,防止第一次全部读取,造成系统阻塞
            sc.set("spark.streaming.backpressure.initialRate", "1000");
            //(4)限制每秒每个消费线程读取每个kafka分区最大的数据量
            sc.set("spark.streaming.kafka.maxRatePerPartition", "1000");
            /**
             * 注意:
             只有(4)激活的时候,每次消费的最大数据量,就是设置的数据量,如果不足这个数,就有多少读多少,如果超过这个数字,就读取这个数字的设置的值
             只有(2)+(4)激活的时候,每次消费读取的数量最大会等于(4)设置的值,最小是spark根据系统负载自动推断的值,消费的数据量会在这两个范围之内变化根据系统情况,但第一次启动会有多少读多少数据。此后按(2)+(4)设置规则运行
             (2)+(3)+(4)同时激活的时候,跟上一个消费情况基本一样,但第一次消费会得到限制,因为我们设置第一次消费的频率了
             */
    
            JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(180));//180秒一次
            jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些
            // 首先要创建一份kafka参数map
            Map<String, Object> kafkaParams = new HashMap<String, Object>();
            // 这里是不需要zookeeper节点,所以这里放broker.list
            String brokerslist=kafkaData.getString("BROKERS_LIST");
            String topics = kafkaData.getString("TOPICS");
            String groupId=kafkaData.getString("GROUP_ID");
            //Kafka服务监听端口
            kafkaParams.put("bootstrap.servers",brokerslist);
            //指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            //指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            //消费者ID,随意指定
            kafkaParams.put("group.id", groupId);
            //earliest
            //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            //latest
            //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            kafkaParams.put("auto.offset.reset", "earliest");
            //如果true,consumer定期地往zookeeper写入每个分区的offset
            kafkaParams.put("enable.auto.commit", true);
            //这里不用担心时间久了kafka的数据会很多,因为kafka有自动清理机制,
            // 默认把消息数据保存168小时,超过的都会自动清理,配置在server.properties文件中的log.retention.hours
    
            //我们该用另外一种方式来获取多分区中的topic和offset
            Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
            //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
            try {
                JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
                );
                //使用map先对InputDStream进行取值操作
                //先过滤无用的数据
                JavaDStream<String> lines=kafkaStream.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
                    @Override
                    public Boolean call(ConsumerRecord<String, String> orgRecord) throws Exception {
                        String today=sdf.format(new Date());
                        //System.out.println("时间过滤:"+today);
                        String orgVal=orgRecord.value();
                        return orgVal.contains(flag1)&&orgVal.contains(flag2)&&orgVal.startsWith(today);//满足条件的才会被留下,不满足条件都会被过滤掉
                    }
                }).map(new Function<ConsumerRecord<String, String>, String>() {
                            @Override
                            public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                                String line=consumerRecord.value();
                                int f1=line.indexOf(flag1)+flag1.length();
                                String temp=line.substring(f1);
                                int f2=temp.indexOf(",",0);
                                String lastupTime= line.substring(11,23);
                                String uid=temp.substring(0,f2);
                                String day=line.substring(0,10);
                                return uid+"_"+day;
                            }
                });
                lines.print();
                JavaPairDStream<String, Integer> pairs  = lines.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey((v1,v2)->v1+v2);
                // 只更新数值变更的数据
                Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
                        (word, one, state) -> {
                            Date newDate=new Date();
                            String today=sdf.format(newDate);
                            System.out.println("时间过滤3:"+today+",word:"+word);
                            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
                            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
                            if(word.contains(today)&&!state.isTimingOut()) {
                                state.update(sum);
                            }else{
                                
                          System.out.println("today:"+today+",word:"+word+",timeout:"+state.isTimingOut());
                         if(!state.isTimingOut()) {
                                state.remove();
                          }
                            }
                            return output;
                };
    
                // DStream made of get cumulative counts that get updated in every batch
                JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
                        pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(1440)));//24小时后缓存的key过期
                JavaPairDStream<String,Integer> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key
                if (fullStateDstream != null) {
                    fullStateDstream.print();
                    //遍历DStream,并转换成RDD
                    fullStateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
                        @Override
                        public void call(JavaPairRDD<String, Integer> tuple2JavaRDD) throws Exception {
                            if (tuple2JavaRDD.isEmpty()){
                                System.out.println("暂无可处理数据");
                                return;
                            }
                            JavaRDD<String> newRdd =tuple2JavaRDD.map(new Function<Tuple2<String, Integer>, String>() {
                                @Override
                                public String call(Tuple2<String, Integer> record) throws Exception {
                                    Date newDate=new Date();
                                    String today=sdf.format(newDate);
                                    if (record._1.contains(today)){
                                        return record._2+"_"+record._1;
                                    }else{
                                        System.out.println("排序前被过滤的数据today:"+today+",record._1:"+record._1);
                                        return null;
                                    }
                                }
                            });
                            //这里对RDD的键值进行互换,并进行排序
                            JavaPairRDD<Integer,String> fRdd=newRdd.mapToPair(new PairFunction<String, Integer,String>() {
                                @Override
                                public Tuple2<Integer,String> call(String s) throws Exception {
                                    if (null==s||"".equals(s)){
                                        return new Tuple2<Integer,String>(0,"needRemove");
                                    }else {
                                        String[] vals = s.split("_");
                                        return new Tuple2<Integer, String>(Integer.valueOf(vals[0]), vals[1] + "_" + vals[2]);
                                    }
                                }
                            }).filter(new Function<Tuple2<Integer, String>, Boolean>() {
                                @Override
                                public Boolean call(Tuple2<Integer, String> v1) throws Exception {
                                    return !"needRemove".equals(v1._2);//过滤掉需要删除的key
                                }
                            }).sortByKey(false);
                            //数据处理并入库
                            processJavaRDDData(fRdd);
                            fRdd=null;
                            newRdd=null;
                            tuple2JavaRDD=null;
                        }
                    });
                }else{
                    System.out.println("暂无key存在");
                }
                jssc.start();
                jssc.awaitTermination();
                jssc.stop();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        private static void processJavaRDDData(JavaPairRDD<Integer,String> fRdd) throws Exception {
            //单数据样例:[330,11111111_2019-09-08],取top5入库
            List<Tuple2<Integer, String>> top5 = fRdd.collect();
            if(top5.size()>5) {
                top5 = top5.subList(0, 5);
            }
            System.out.println("top5:"+top5);
            if(top5.size()>0) {
                Connection connection = DBUtils.getConnection();
                connection.setAutoCommit(false);
                //获取数据中日期进行删除后重新插入(kafka遍历后数据日期同一天)
                String time = fRdd.collect().get(0)._2.split("_")[1];
                String delSql = "DELETE FROM bigdata_update_cart_cnt where up_date='" + time + "'";//清空表数据,保留表结构
                PreparedStatement dst = connection.prepareStatement(delSql);
                dst.executeUpdate();
                connection.commit();
                int recordsize = 0;
                dst = connection.prepareStatement(addSql);
                for (Tuple2<Integer, String> row : top5) {
                    String[] vals = row._2.split("_");
                    String uid = vals[0];
                    String day = vals[1];
                    int cnt = row._1();
                    dst.setString(1, uid);
                    dst.setString(2, day);
                    dst.setInt(3, cnt);
                    dst.addBatch();
                    if ((recordsize + 1) % setp == 0) {
                        dst.executeBatch();//执行批量处理
                    }
                    recordsize++;
                }
                if (recordsize % setp != 0) {//不被整除,才执行这个
                    dst.executeBatch();//执行批量处理
                }
                connection.commit();//手动提交
                DBUtils.DBclose(connection, dst, null);
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:解决DStream的mapWithState无效key清除的问题

          本文链接:https://www.haomeiwen.com/subject/dahtpctx.html