美文网首页
java版实时统计当天pv

java版实时统计当天pv

作者: 机灵鬼鬼 | 来源:发表于2019-06-25 17:09 被阅读0次

    package com.liushun.busi;/*

    @auth:liushun

    @date:2019-06-22 15:30

    统计当天到目前为止各页面的pv总量

    */

    import com.alibaba.fastjson.JSON;

    import com.alibaba.fastjson.JSONObject;

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.common.TopicPartition;

    import org.apache.kafka.common.serialization.StringDeserializer;

    import org.apache.spark.SparkConf;

    import org.apache.spark.api.java.JavaRDD;

    import org.apache.spark.api.java.Optional;

    import org.apache.spark.api.java.function.Function;

    import org.apache.spark.api.java.function.Function2;

    import org.apache.spark.sql.Dataset;

    import org.apache.spark.sql.Row;

    import org.apache.spark.sql.SparkSession;

    import org.apache.spark.streaming.Durations;

    import org.apache.spark.streaming.api.java.JavaDStream;

    import org.apache.spark.streaming.api.java.JavaInputDStream;

    import org.apache.spark.streaming.api.java.JavaPairDStream;

    import org.apache.spark.streaming.api.java.JavaStreamingContext;

    import org.apache.spark.streaming.kafka010.ConsumerStrategies;

    import org.apache.spark.streaming.kafka010.KafkaUtils;

    import org.apache.spark.streaming.kafka010.LocationStrategies;

    import scala.Tuple2;

    import java.util.*;

    public final class DayPvCount {

    public static void main(String[] args) {

    SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("DayPvCount");

    JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));//5秒一次

            jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些

    // 首先要创建一份kafka参数map

            Map kafkaParams =new HashMap();

    // 这里是不需要zookeeper节点,所以这里放broker.list

            String brokerslist="10.101.3.3:9092";

    String topics ="flume_kafka_streaming";

    //Kafka服务监听端口

            kafkaParams.put("bootstrap.servers",brokerslist);

    //指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)

            kafkaParams.put("key.deserializer", StringDeserializer.class);

    //指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)

            kafkaParams.put("value.deserializer", StringDeserializer.class);

    //消费者ID,随意指定

            kafkaParams.put("group.id","jis");

    //指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据

            kafkaParams.put("auto.offset.reset","latest");

    //如果true,consumer定期地往zookeeper写入每个分区的offset

            kafkaParams.put("enable.auto.commit",false);

    //Topic分区

            Map offsets =new HashMap<>();

    offsets.put(new TopicPartition(topics,0),0L);

    Collection topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));

    //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定

            try {

    JavaInputDStream> kafkaStream = KafkaUtils.createDirectStream(

    jssc,

    LocationStrategies.PreferConsistent(),

    ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)

    );

    //使用map先对InputDStream进行取值操作

                JavaDStream lines=kafkaStream.map(new Function, String>() {

    @Override

                    public String call(ConsumerRecord consumerRecord)throws Exception {

    JSONObject obj= JSON.parseObject(consumerRecord.value());//解析出来josn行中我们所需要的信息字段

    //2019-01-10_WX

                        return obj.getString("clientDate").substring(0,10)+"_"+obj.getString("fromClient");

    }

    });

    JavaPairDStream pairs  = lines.mapToPair(s ->new Tuple2<>(s,1));

    //状态保留

                JavaPairDStream runningCounts = pairs.updateStateByKey(

    new Function2,Optional,Optional>() {

    @Override

                            public Optional call(List values, Optional state)throws Exception {

    Integer updateValue =0;

    if (state.isPresent()) {//是否为空

    // 如果有值就获取

                                    updateValue = state.get();

    }

    // 累加

                                for (Integer value : values) {

    updateValue += value;

    }

    return Optional.of(updateValue);

    }

    }

    );

    runningCounts.print();

    jssc.start();

    jssc.awaitTermination();

    jssc.stop();

    }catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    /** Lazily instantiated singleton instance of SparkSession */

    class JavaSparkSessionSingleton {

    private static transient SparkSessioninstance =null;

    public static SparkSession getInstance(SparkConf sparkConf) {

    if (instance ==null) {

    instance = SparkSession

    .builder()

    .config(sparkConf)

    .getOrCreate();

    }

    return instance;

    }

    }

    相关文章

      网友评论

          本文标题:java版实时统计当天pv

          本文链接:https://www.haomeiwen.com/subject/ojsxcctx.html