美文网首页
166、Spark Streaming实战开发进阶之新闻网站关键

166、Spark Streaming实战开发进阶之新闻网站关键

作者: ZFH__ZJ | 来源:发表于2019-01-27 16:40 被阅读0次

    生产者代码

    public class AccessProducer extends Thread {
    
        private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        private static Random random = new Random();
        private static String[] sections = new String[] {"country", "international", "sport", "entertainment", "movie", "carton", "tv-show", "technology", "internet", "car"};
        private static int[] arr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        private static String date;
    
        private Producer<Integer, String> producer;
        private String topic;
    
        public AccessProducer(String topic) {
            this.topic = topic;
            // producer = new Producer<Integer, String>(createProducerConfig());
            producer = new KafkaProducer<>(createProducerProperties());
            date = sdf.format(new Date());
        }
    
        private Properties createProducerProperties() {
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("bootstrap.servers", "192.168.114.200:9092,192.168.114.201:9092:9092,192.168.114.202:9092");
            return props;
        }
    
        @Override
        public void run() {
            int counter = 0;
    
            while(true) {
                for(int i = 0; i < 100; i++) {
                    String log = null;
    
                    if(arr[random.nextInt(10)] == 1) {
                        log = getRegisterLog();
                    } else {
                        log = getAccessLog();
                    }
    
                    producer.send(new ProducerRecord<Integer, String>(topic, i, log));
    
                    counter++;
                    if(counter == 100) {
                        counter = 0;
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    
        private static String getAccessLog() {
            StringBuffer buffer = new StringBuffer("");
    
            // 生成时间戳
            long timestamp = System.currentTimeMillis();
    
            // 生成随机userid(默认1000注册用户,每天1/10的访客是未注册用户)
            Long userid = 0L;
    
            int newOldUser = arr[random.nextInt(10)];
            if(newOldUser == 1) {
                userid = null;
            } else {
                userid = (long) random.nextInt(1000);
            }
    
            // 生成随机pageid(总共1k个页面)
            Long pageid = (long) random.nextInt(1000);
    
            // 生成随机版块(总共10个版块)
            String section = sections[random.nextInt(10)];
    
            // 生成固定的行为,view
            String action = "view";
    
            return buffer.append(date).append(" ")
                    .append(timestamp).append(" ")
                    .append(userid).append(" ")
                    .append(pageid).append(" ")
                    .append(section).append(" ")
                    .append(action).toString();
        }
    
        private static String getRegisterLog() {
            StringBuffer buffer = new StringBuffer("");
    
            // 生成时间戳
            long timestamp = System.currentTimeMillis();
    
            // 新用户都是userid为null
            Long userid = null;
    
            // 生成随机pageid,都是null
            Long pageid = null;
    
            // 生成随机版块,都是null
            String section = null;
    
            // 生成固定的行为,view
            String action = "register";
    
            return buffer.append(date).append(" ")
                    .append(timestamp).append(" ")
                    .append(userid).append(" ")
                    .append(pageid).append(" ")
                    .append(section).append(" ")
                    .append(action).toString();
        }
    
        public static void main(String[] args) {
            AccessProducer producer = new AccessProducer("news");
            producer.start();
        }
    }
    

    kafka创建topic 并测试

    kafka-topics.sh --zookeeper 192.168.114.200:2181,192.168.114.201:2181,192.168.114.202:2181 --topic news --replication-factor 2 --partitions 1 --create
    kafka-console-consumer.sh --zookeeper 192.168.114.200:2181,192.168.114.201:2181,192.168.114.202:2181 --topic news --from-beginning

    代码

    main函数

        public static void main(String[] args) {
            // 创建Spark上下文
            SparkConf conf = new SparkConf()
                    .setMaster("local[2]")
                    .setAppName("NewsRealtimeStatSpark")
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            JavaStreamingContext jssc = new JavaStreamingContext(
                    conf, Durations.seconds(5));
    
            // 创建输入DStream
            Map<String, Object> kafkaParams = new HashMap<String, Object>();
            kafkaParams.put("bootstrap.servers", "192.168.114.200:9092,192.168.114.201:9092,192.168.114.202:9092");
            kafkaParams.put("key.deserializer", IntegerDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "111");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", false);
            Collection<String> topics = Arrays.asList("news");
            JavaInputDStream<ConsumerRecord<Integer, String>> lines = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topics, kafkaParams));
            // 过滤出访问日志
            JavaDStream<ConsumerRecord<Integer, String>> accessDStream = lines.filter(new Function<ConsumerRecord<Integer, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<Integer, String> v1) throws Exception {
                    String value = v1.value();
                    String[] strings = value.split(" ");
                    if ("view".equals(strings[5])) {
                        return true;
                    }
                    return false;
                }
            });
            JavaDStream<String> accessStringDStreamString = accessDStream.map(new Function<ConsumerRecord<Integer, String>, String>() {
                @Override
                public String call(ConsumerRecord<Integer, String> v1) throws Exception {
                    return v1.value();
                }
            });
    
            JavaDStream<ConsumerRecord<Integer, String>> registerDStream = lines.filter(new Function<ConsumerRecord<Integer, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<Integer, String> v1) throws Exception {
                    String value = v1.value();
                    String[] strings = value.split(" ");
                    if ("view".equals(strings[5])) {
                        return false;
                    }
                    return true;
                }
            });
    
            JavaDStream<String> registerDStreamString = registerDStream.map(new Function<ConsumerRecord<Integer, String>, String>() {
                @Override
                public String call(ConsumerRecord<Integer, String> v1) throws Exception {
                    return v1.value();
                }
            });
    
    
            // 统计第一个指标:实时页面pv
            calculatePagePv(accessStringDStreamString);
            // 统计第二个指标:实时页面uv
            calculatePageUv(accessStringDStreamString);
            // 统计第三个指标:实时注册用户数
            calculateRegisterCount(registerDStreamString);
            // 统计第四个指标:实时用户跳出数
            calculateUserJumpCount(accessStringDStreamString);
            // 统计第五个指标:实时版块pv
            calcualteSectionPv(accessStringDStreamString);
            jssc.start();
            try {
                jssc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            jssc.close();
    
        }
    

    实时页面pv

        private static void calculatePagePv(JavaDStream<String> accessStringDStreamString) {
            JavaPairDStream<Long, Long> pageidDStream = accessStringDStreamString.mapToPair(new PairFunction<String, Long, Long>() {
                @Override
                public Tuple2<Long, Long> call(String s) throws Exception {
                    String[] strings = s.split(" ");
                    return new Tuple2<>(Long.parseLong(strings[3]), 1L);
                }
            });
            JavaPairDStream<Long, Long> pagePvDStream = pageidDStream.reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            pagePvDStream.print();
        }
    

    实时页面uv

        private static void calculatePageUv(JavaDStream<String> accessStringDStreamString) {
            JavaDStream<String> pageidUseridDStream = accessStringDStreamString.map(new Function<String, String>() {
                @Override
                public String call(String v1) throws Exception {
                    String[] split = v1.split(" ");
                    return split[3] + "_" + split[2];
                }
            });
    
            JavaDStream<String> distinctPageidUseridDStream = pageidUseridDStream.transform(new Function2<JavaRDD<String>, Time, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaRDD<String> v1, Time v2) throws Exception {
                    return v1.distinct();
                }
            });
    
            JavaPairDStream<Long, Long> pageidDStream = distinctPageidUseridDStream.mapToPair(new PairFunction<String, Long, Long>() {
                @Override
                public Tuple2<Long, Long> call(String s) throws Exception {
                    String[] strings = s.split("_");
                    return new Tuple2<>(Long.parseLong(strings[0]), 1L);
                }
            });
    
            JavaPairDStream<Long, Long> pageUvDStream = pageidDStream.reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            pageUvDStream.print();
        }
    

    实时注册用户数

        private static void calculateRegisterCount(JavaDStream<String> registerDStreamString) {
            JavaDStream<Long> count = registerDStreamString.count();
            count.print();
        }
    

    实时用户跳出数

        private static void calculateUserJumpCount(JavaDStream<String> accessStringDStreamString) {
            JavaPairDStream<Long, Long> useridDStream = accessStringDStreamString.mapToPair(new PairFunction<String, Long, Long>() {
                @Override
                public Tuple2<Long, Long> call(String s) throws Exception {
                    String[] strings = s.split(" ");
                    Long userId = -1L;
                    if (!"null".equalsIgnoreCase(strings[2])) {
                        userId = Long.parseLong(strings[2]);
                    }
                    return new Tuple2<>(userId, 1L);
                }
            });
    
            JavaPairDStream<Long, Long> useridCountDStream = useridDStream.reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            JavaPairDStream<Long, Long> jumpUserDStream = useridCountDStream.filter(new Function<Tuple2<Long, Long>, Boolean>() {
                @Override
                public Boolean call(Tuple2<Long, Long> v1) throws Exception {
                    if (v1._1 != -1) {
                        if (v1._2 == 1) {
                            return true;
                        }
                        return false;
                    }
                    return false;
                }
            });
    
            JavaDStream<Long> count = jumpUserDStream.count();
    
            count.print();
        }
    

    实时版块pv

        private static void calcualteSectionPv(JavaDStream<String> accessStringDStreamString) {
            JavaPairDStream<String, Long> sectionDStream = accessStringDStreamString.mapToPair(new PairFunction<String, String, Long>() {
                @Override
                public Tuple2<String, Long> call(String s) throws Exception {
                    String[] strings = s.split(" ");
    
                    return new Tuple2<>(strings[4], 1L);
                }
            });
    
            JavaPairDStream<String, Long> sectionPvDStream = sectionDStream.reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            sectionPvDStream.print();
    
        }
    

    相关文章

      网友评论

          本文标题:166、Spark Streaming实战开发进阶之新闻网站关键

          本文链接:https://www.haomeiwen.com/subject/fptijqtx.html