1. 总体思路
在写ES的时候指定ID,一个ID对应唯一一条记录。
2. 代码
public void run() {
SparkConf conf = new SparkConf().setAppName("SparkStreaming_" + appName)
.set("es.nodes", "11.11.11.11")
.set("es.port", "9200")
.set("es.input.json", "yes")
.set("es.write.operation", "index");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.minutes(1));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "exampleGroup");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("exampleTopic");
final JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> dStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> v1) throws Exception {
return v1.value();
}
});
return tempRdd;
}
}).filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
return StringUtils.isNotBlank(v1);
}
}).flatMapToPair(new PairFlatMapFunction<String, String, JSONObject>() {
@Override
public Iterator<Tuple2<String, JSONObject>> call(String s) throws Exception {
List<Tuple2<String, JSONObject>> list = new ArrayList<>();
if (StringUtils.isNotBlank(s)) {
String[] strArr = s.split("\\|", -1);
if (strArr.length >= 4) {
String keyStr = strArr[0] + "_" + strArr[1] + "_" + strArr[2] + "_" + strArr[3];
JSONObject jsonObject = new JSONObject();
jsonObject.put("keystr", keyStr);
jsonObject.put("time", strArr[0]);
jsonObject.put("type1", strArr[1]);
jsonObject.put("type2", strArr[2]);
jsonObject.put("type3", strArr[3]);
jsonObject.put("count", 1L);
jsonObject.put("timestamp", DATE_FORMAT_MS.format(new Date()));
Tuple2<String, JSONObject> tuple2 = new Tuple2<>(keyStr, jsonObject);
list.add(tuple2);
}
}
return list.iterator();
}
}).reduceByKey(new Function2<JSONObject, JSONObject, JSONObject>() {
@Override
public JSONObject call(JSONObject v1, JSONObject v2) throws Exception {
v1.put("count", v1.getLongValue("count") + v2.getLongValue("count"));
return v1;
}
}).map(new Function<Tuple2<String, JSONObject>, String>() {
@Override
public String call(Tuple2<String, JSONObject> v1) throws Exception {
return v1._2.toJSONString();
}
});
dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
String source = "index-example/exampletype";
JavaEsSpark.saveToEs(rdd, source, ImmutableMap.of("es.mapping.id", "keystr"));
}
});
}
- 生成能表示某条记录的唯一ID号
- 写ES时指定ID保证不重复
3. 测试结果
输入:
20190708010000|a1|b1|c1
20190708010000|a1|b2|c2
20190708010000|a1|b1|c1
20190708010000|a2|b2|c3
20190708010000|a3|b3|c2
20190708020000|a1|b2|c3
20190708020000|a2|b2|c2
20190708020000|a2|b2|c1
20190708020000|a1|b1|c1
20190708020000|a1|b1|c1
20190708020000|a2|b2|c2
输出:
同样的数据统计两次,timestamp变了,但是统计结果没有改变。
网友评论