package com.liushun.busi;/*
@auth:liushun
@date:2019-06-22 15:30
统计当天到目前为止各页面的pv总量
*/
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
public final class DayPvCount {
public static void main(String[] args) {
SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("DayPvCount");
JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));//5秒一次
jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些
// 首先要创建一份kafka参数map
Map kafkaParams =new HashMap();
// 这里是不需要zookeeper节点,所以这里放broker.list
String brokerslist="10.101.3.3:9092";
String topics ="flume_kafka_streaming";
//Kafka服务监听端口
kafkaParams.put("bootstrap.servers",brokerslist);
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费者ID,随意指定
kafkaParams.put("group.id","jis");
//指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
kafkaParams.put("auto.offset.reset","latest");
//如果true,consumer定期地往zookeeper写入每个分区的offset
kafkaParams.put("enable.auto.commit",false);
//Topic分区
Map offsets =new HashMap<>();
offsets.put(new TopicPartition(topics,0),0L);
Collection topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));
//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
try {
JavaInputDStream> kafkaStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
);
//使用map先对InputDStream进行取值操作
JavaDStream lines=kafkaStream.map(new Function, String>() {
@Override
public String call(ConsumerRecord consumerRecord)throws Exception {
JSONObject obj= JSON.parseObject(consumerRecord.value());//解析出来josn行中我们所需要的信息字段
//2019-01-10_WX
return obj.getString("clientDate").substring(0,10)+"_"+obj.getString("fromClient");
}
});
JavaPairDStream pairs = lines.mapToPair(s ->new Tuple2<>(s,1));
//状态保留
JavaPairDStream runningCounts = pairs.updateStateByKey(
new Function2,Optional,Optional>() {
@Override
public Optional call(List values, Optional state)throws Exception {
Integer updateValue =0;
if (state.isPresent()) {//是否为空
// 如果有值就获取
updateValue = state.get();
}
// 累加
for (Integer value : values) {
updateValue += value;
}
return Optional.of(updateValue);
}
}
);
runningCounts.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/** Lazily instantiated singleton instance of SparkSession */
class JavaSparkSessionSingleton {
private static transient SparkSessioninstance =null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance ==null) {
instance = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
}
return instance;
}
}
网友评论