该实例使用spark streaming从kafka中实时分析当天加减购的次数排名top5的用户。
实现思路就是通过reduceByKey和mapWithState结合完成数据的统计。在统计过程中遇到的问题可能大家都会遇到:
问题1:当跨天的时候,mapWithState中的缓存会存储以前计算过的过期的key,比如我们今天是10月26日,按照我们的思路只需要对10月26日发生的加购行为的用户做累加统计,10月25日及其以前key的就不要了。但是实际情况并不是这样,他依然会被mapWithState存储在缓存中,除非你重启你的程序。那么问题来了,我们怎么把已经计算过的10月25日及其以前的key从mapWithState缓存中去除呢?
解决方案:
第一步、这时候我们就用了State的超时机制,为每个key都固定设置24小时的存活时间,过期后会自动清除。这样保证sparkStreaming内在机制可以自动清理掉24小时以外的过期key。但这种情况依然会存在两天的数据key的情况,比如10月25日15点产生的key,必须等到10月26日15点才能被销毁,所以就需要第二步屏蔽操作。
第二步、从分析程序用日期屏蔽除了10月26日以外的key。
问题2:相同的key为什么会出现多次,好像并没有合并同类项?
解决方案:这是因为mapWithState是并没有合并的机制的,这一点不同于updateBykey可以自动reduceBykey。所以我们在使用mapWithState来进行词频统计的时候,要先进行reduceBykey才能避免相同的key重复出现的情况。
查看Spark Streaming消费消息的情况的工具
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server hanode5:9092 --describe --group jis
image.png
package com.lppz.busi;
import com.lppz.common.DBUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.None;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.*;
public class DayUpdateCartCntTopN {
static String flag1="更新购物车接口,uid:";
static String flag2="返回给前端的购物车信息cartInfoMap:";
static String addSql="insert into bigdata_update_cart_cnt(uid, up_date, cnt) values(?,?,?)";
static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
static int setp=300;
public static void main(String[] args) {
ResourceBundle kafkaData = ResourceBundle.getBundle("META-INF/kafka");
SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("DayUpdateCartCntTopN");
//控制sparkstreaming启动时,积压问题并设置背压机制,自适应批次的record变化,来控制任务的堆积
//(1)确保在kill任务时,能够处理完最后一批数据,再关闭程序,不会发生强制kill导致数据处理中断,没处理完的数据丢失
sc.set("spark.streaming.stopGracefullyOnShutdown", "true");
//(2)开启后spark自动根据系统负载选择最优消费速率
sc.set("spark.streaming.backpressure.enabled", "true");
//(3)开启的情况下,限制第一次批处理应该消费的数据,因为程序冷启动 队列里面有大量积压,防止第一次全部读取,造成系统阻塞
sc.set("spark.streaming.backpressure.initialRate", "1000");
//(4)限制每秒每个消费线程读取每个kafka分区最大的数据量
sc.set("spark.streaming.kafka.maxRatePerPartition", "1000");
/**
* 注意:
只有(4)激活的时候,每次消费的最大数据量,就是设置的数据量,如果不足这个数,就有多少读多少,如果超过这个数字,就读取这个数字的设置的值
只有(2)+(4)激活的时候,每次消费读取的数量最大会等于(4)设置的值,最小是spark根据系统负载自动推断的值,消费的数据量会在这两个范围之内变化根据系统情况,但第一次启动会有多少读多少数据。此后按(2)+(4)设置规则运行
(2)+(3)+(4)同时激活的时候,跟上一个消费情况基本一样,但第一次消费会得到限制,因为我们设置第一次消费的频率了
*/
JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(180));//180秒一次
jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些
// 首先要创建一份kafka参数map
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// 这里是不需要zookeeper节点,所以这里放broker.list
String brokerslist=kafkaData.getString("BROKERS_LIST");
String topics = kafkaData.getString("TOPICS");
String groupId=kafkaData.getString("GROUP_ID");
//Kafka服务监听端口
kafkaParams.put("bootstrap.servers",brokerslist);
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费者ID,随意指定
kafkaParams.put("group.id", groupId);
//earliest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
kafkaParams.put("auto.offset.reset", "earliest");
//如果true,consumer定期地往zookeeper写入每个分区的offset
kafkaParams.put("enable.auto.commit", true);
//这里不用担心时间久了kafka的数据会很多,因为kafka有自动清理机制,
// 默认把消息数据保存168小时,超过的都会自动清理,配置在server.properties文件中的log.retention.hours
//我们该用另外一种方式来获取多分区中的topic和offset
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
try {
JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
);
//使用map先对InputDStream进行取值操作
//先过滤无用的数据
JavaDStream<String> lines=kafkaStream.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
@Override
public Boolean call(ConsumerRecord<String, String> orgRecord) throws Exception {
String today=sdf.format(new Date());
//System.out.println("时间过滤:"+today);
String orgVal=orgRecord.value();
return orgVal.contains(flag1)&&orgVal.contains(flag2)&&orgVal.startsWith(today);//满足条件的才会被留下,不满足条件都会被过滤掉
}
}).map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
String line=consumerRecord.value();
int f1=line.indexOf(flag1)+flag1.length();
String temp=line.substring(f1);
int f2=temp.indexOf(",",0);
String lastupTime= line.substring(11,23);
String uid=temp.substring(0,f2);
String day=line.substring(0,10);
return uid+"_"+day;
}
});
lines.print();
JavaPairDStream<String, Integer> pairs = lines.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey((v1,v2)->v1+v2);
// 只更新数值变更的数据
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
(word, one, state) -> {
Date newDate=new Date();
String today=sdf.format(newDate);
System.out.println("时间过滤3:"+today+",word:"+word);
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
if(word.contains(today)&&!state.isTimingOut()) {
state.update(sum);
}else{
System.out.println("today:"+today+",word:"+word+",timeout:"+state.isTimingOut());
if(!state.isTimingOut()) {
state.remove();
}
}
return output;
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(1440)));//24小时后缓存的key过期
JavaPairDStream<String,Integer> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key
if (fullStateDstream != null) {
fullStateDstream.print();
//遍历DStream,并转换成RDD
fullStateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
@Override
public void call(JavaPairRDD<String, Integer> tuple2JavaRDD) throws Exception {
if (tuple2JavaRDD.isEmpty()){
System.out.println("暂无可处理数据");
return;
}
JavaRDD<String> newRdd =tuple2JavaRDD.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> record) throws Exception {
Date newDate=new Date();
String today=sdf.format(newDate);
if (record._1.contains(today)){
return record._2+"_"+record._1;
}else{
System.out.println("排序前被过滤的数据today:"+today+",record._1:"+record._1);
return null;
}
}
});
//这里对RDD的键值进行互换,并进行排序
JavaPairRDD<Integer,String> fRdd=newRdd.mapToPair(new PairFunction<String, Integer,String>() {
@Override
public Tuple2<Integer,String> call(String s) throws Exception {
if (null==s||"".equals(s)){
return new Tuple2<Integer,String>(0,"needRemove");
}else {
String[] vals = s.split("_");
return new Tuple2<Integer, String>(Integer.valueOf(vals[0]), vals[1] + "_" + vals[2]);
}
}
}).filter(new Function<Tuple2<Integer, String>, Boolean>() {
@Override
public Boolean call(Tuple2<Integer, String> v1) throws Exception {
return !"needRemove".equals(v1._2);//过滤掉需要删除的key
}
}).sortByKey(false);
//数据处理并入库
processJavaRDDData(fRdd);
fRdd=null;
newRdd=null;
tuple2JavaRDD=null;
}
});
}else{
System.out.println("暂无key存在");
}
jssc.start();
jssc.awaitTermination();
jssc.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void processJavaRDDData(JavaPairRDD<Integer,String> fRdd) throws Exception {
//单数据样例:[330,11111111_2019-09-08],取top5入库
List<Tuple2<Integer, String>> top5 = fRdd.collect();
if(top5.size()>5) {
top5 = top5.subList(0, 5);
}
System.out.println("top5:"+top5);
if(top5.size()>0) {
Connection connection = DBUtils.getConnection();
connection.setAutoCommit(false);
//获取数据中日期进行删除后重新插入(kafka遍历后数据日期同一天)
String time = fRdd.collect().get(0)._2.split("_")[1];
String delSql = "DELETE FROM bigdata_update_cart_cnt where up_date='" + time + "'";//清空表数据,保留表结构
PreparedStatement dst = connection.prepareStatement(delSql);
dst.executeUpdate();
connection.commit();
int recordsize = 0;
dst = connection.prepareStatement(addSql);
for (Tuple2<Integer, String> row : top5) {
String[] vals = row._2.split("_");
String uid = vals[0];
String day = vals[1];
int cnt = row._1();
dst.setString(1, uid);
dst.setString(2, day);
dst.setInt(3, cnt);
dst.addBatch();
if ((recordsize + 1) % setp == 0) {
dst.executeBatch();//执行批量处理
}
recordsize++;
}
if (recordsize % setp != 0) {//不被整除,才执行这个
dst.executeBatch();//执行批量处理
}
connection.commit();//手动提交
DBUtils.DBclose(connection, dst, null);
}
}
}
网友评论