生产者代码
public class AccessProducer extends Thread {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
private static Random random = new Random();
private static String[] sections = new String[] {"country", "international", "sport", "entertainment", "movie", "carton", "tv-show", "technology", "internet", "car"};
private static int[] arr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
private static String date;
private Producer<Integer, String> producer;
private String topic;
public AccessProducer(String topic) {
this.topic = topic;
// producer = new Producer<Integer, String>(createProducerConfig());
producer = new KafkaProducer<>(createProducerProperties());
date = sdf.format(new Date());
}
private Properties createProducerProperties() {
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("bootstrap.servers", "192.168.114.200:9092,192.168.114.201:9092:9092,192.168.114.202:9092");
return props;
}
@Override
public void run() {
int counter = 0;
while(true) {
for(int i = 0; i < 100; i++) {
String log = null;
if(arr[random.nextInt(10)] == 1) {
log = getRegisterLog();
} else {
log = getAccessLog();
}
producer.send(new ProducerRecord<Integer, String>(topic, i, log));
counter++;
if(counter == 100) {
counter = 0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
private static String getAccessLog() {
StringBuffer buffer = new StringBuffer("");
// 生成时间戳
long timestamp = System.currentTimeMillis();
// 生成随机userid(默认1000注册用户,每天1/10的访客是未注册用户)
Long userid = 0L;
int newOldUser = arr[random.nextInt(10)];
if(newOldUser == 1) {
userid = null;
} else {
userid = (long) random.nextInt(1000);
}
// 生成随机pageid(总共1k个页面)
Long pageid = (long) random.nextInt(1000);
// 生成随机版块(总共10个版块)
String section = sections[random.nextInt(10)];
// 生成固定的行为,view
String action = "view";
return buffer.append(date).append(" ")
.append(timestamp).append(" ")
.append(userid).append(" ")
.append(pageid).append(" ")
.append(section).append(" ")
.append(action).toString();
}
private static String getRegisterLog() {
StringBuffer buffer = new StringBuffer("");
// 生成时间戳
long timestamp = System.currentTimeMillis();
// 新用户都是userid为null
Long userid = null;
// 生成随机pageid,都是null
Long pageid = null;
// 生成随机版块,都是null
String section = null;
// 生成固定的行为,view
String action = "register";
return buffer.append(date).append(" ")
.append(timestamp).append(" ")
.append(userid).append(" ")
.append(pageid).append(" ")
.append(section).append(" ")
.append(action).toString();
}
public static void main(String[] args) {
AccessProducer producer = new AccessProducer("news");
producer.start();
}
}
kafka创建topic 并测试
kafka-topics.sh --zookeeper 192.168.114.200:2181,192.168.114.201:2181,192.168.114.202:2181 --topic news --replication-factor 2 --partitions 1 --create
kafka-console-consumer.sh --zookeeper 192.168.114.200:2181,192.168.114.201:2181,192.168.114.202:2181 --topic news --from-beginning
代码
main函数
public static void main(String[] args) {
// 创建Spark上下文
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("NewsRealtimeStatSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaStreamingContext jssc = new JavaStreamingContext(
conf, Durations.seconds(5));
// 创建输入DStream
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "192.168.114.200:9092,192.168.114.201:9092,192.168.114.202:9092");
kafkaParams.put("key.deserializer", IntegerDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "111");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("news");
JavaInputDStream<ConsumerRecord<Integer, String>> lines = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
// 过滤出访问日志
JavaDStream<ConsumerRecord<Integer, String>> accessDStream = lines.filter(new Function<ConsumerRecord<Integer, String>, Boolean>() {
@Override
public Boolean call(ConsumerRecord<Integer, String> v1) throws Exception {
String value = v1.value();
String[] strings = value.split(" ");
if ("view".equals(strings[5])) {
return true;
}
return false;
}
});
JavaDStream<String> accessStringDStreamString = accessDStream.map(new Function<ConsumerRecord<Integer, String>, String>() {
@Override
public String call(ConsumerRecord<Integer, String> v1) throws Exception {
return v1.value();
}
});
JavaDStream<ConsumerRecord<Integer, String>> registerDStream = lines.filter(new Function<ConsumerRecord<Integer, String>, Boolean>() {
@Override
public Boolean call(ConsumerRecord<Integer, String> v1) throws Exception {
String value = v1.value();
String[] strings = value.split(" ");
if ("view".equals(strings[5])) {
return false;
}
return true;
}
});
JavaDStream<String> registerDStreamString = registerDStream.map(new Function<ConsumerRecord<Integer, String>, String>() {
@Override
public String call(ConsumerRecord<Integer, String> v1) throws Exception {
return v1.value();
}
});
// 统计第一个指标:实时页面pv
calculatePagePv(accessStringDStreamString);
// 统计第二个指标:实时页面uv
calculatePageUv(accessStringDStreamString);
// 统计第三个指标:实时注册用户数
calculateRegisterCount(registerDStreamString);
// 统计第四个指标:实时用户跳出数
calculateUserJumpCount(accessStringDStreamString);
// 统计第五个指标:实时版块pv
calcualteSectionPv(accessStringDStreamString);
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
jssc.close();
}
实时页面pv
private static void calculatePagePv(JavaDStream<String> accessStringDStreamString) {
JavaPairDStream<Long, Long> pageidDStream = accessStringDStreamString.mapToPair(new PairFunction<String, Long, Long>() {
@Override
public Tuple2<Long, Long> call(String s) throws Exception {
String[] strings = s.split(" ");
return new Tuple2<>(Long.parseLong(strings[3]), 1L);
}
});
JavaPairDStream<Long, Long> pagePvDStream = pageidDStream.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
pagePvDStream.print();
}
实时页面uv
private static void calculatePageUv(JavaDStream<String> accessStringDStreamString) {
JavaDStream<String> pageidUseridDStream = accessStringDStreamString.map(new Function<String, String>() {
@Override
public String call(String v1) throws Exception {
String[] split = v1.split(" ");
return split[3] + "_" + split[2];
}
});
JavaDStream<String> distinctPageidUseridDStream = pageidUseridDStream.transform(new Function2<JavaRDD<String>, Time, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> v1, Time v2) throws Exception {
return v1.distinct();
}
});
JavaPairDStream<Long, Long> pageidDStream = distinctPageidUseridDStream.mapToPair(new PairFunction<String, Long, Long>() {
@Override
public Tuple2<Long, Long> call(String s) throws Exception {
String[] strings = s.split("_");
return new Tuple2<>(Long.parseLong(strings[0]), 1L);
}
});
JavaPairDStream<Long, Long> pageUvDStream = pageidDStream.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
pageUvDStream.print();
}
实时注册用户数
private static void calculateRegisterCount(JavaDStream<String> registerDStreamString) {
JavaDStream<Long> count = registerDStreamString.count();
count.print();
}
实时用户跳出数
private static void calculateUserJumpCount(JavaDStream<String> accessStringDStreamString) {
JavaPairDStream<Long, Long> useridDStream = accessStringDStreamString.mapToPair(new PairFunction<String, Long, Long>() {
@Override
public Tuple2<Long, Long> call(String s) throws Exception {
String[] strings = s.split(" ");
Long userId = -1L;
if (!"null".equalsIgnoreCase(strings[2])) {
userId = Long.parseLong(strings[2]);
}
return new Tuple2<>(userId, 1L);
}
});
JavaPairDStream<Long, Long> useridCountDStream = useridDStream.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairDStream<Long, Long> jumpUserDStream = useridCountDStream.filter(new Function<Tuple2<Long, Long>, Boolean>() {
@Override
public Boolean call(Tuple2<Long, Long> v1) throws Exception {
if (v1._1 != -1) {
if (v1._2 == 1) {
return true;
}
return false;
}
return false;
}
});
JavaDStream<Long> count = jumpUserDStream.count();
count.print();
}
实时版块pv
private static void calcualteSectionPv(JavaDStream<String> accessStringDStreamString) {
JavaPairDStream<String, Long> sectionDStream = accessStringDStreamString.mapToPair(new PairFunction<String, String, Long>() {
@Override
public Tuple2<String, Long> call(String s) throws Exception {
String[] strings = s.split(" ");
return new Tuple2<>(strings[4], 1L);
}
});
JavaPairDStream<String, Long> sectionPvDStream = sectionDStream.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
sectionPvDStream.print();
}
网友评论