美文网首页
Spark写ES重复问题优化

Spark写ES重复问题优化

作者: Jorvi | 来源:发表于2019-07-12 13:55 被阅读0次

    1. 总体思路

    在写ES的时候指定ID,一个ID对应唯一一条记录。

    2. 代码

        public void run() {
            SparkConf conf = new SparkConf().setAppName("SparkStreaming_" + appName)
                  .set("es.nodes", "11.11.11.11")
                  .set("es.port", "9200")
                  .set("es.input.json", "yes")
                  .set("es.write.operation", "index");
            JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.minutes(1));
    
            Map<String, Object> kafkaParams = new HashMap<String, Object>();
            kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "exampleGroup");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", false);
    
            Collection<String> topics = Arrays.asList("exampleTopic");
    
            final JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
                    LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    
            JavaDStream<String> dStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
                    JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
                        @Override
                        public String call(ConsumerRecord<String, String> v1) throws Exception {
                            return v1.value();
                        }
                    });
                    return tempRdd;
                }
            }).filter(new Function<String, Boolean>() {
                @Override
                public Boolean call(String v1) throws Exception {
                    return StringUtils.isNotBlank(v1);
                }
            }).flatMapToPair(new PairFlatMapFunction<String, String, JSONObject>() {
                @Override
                public Iterator<Tuple2<String, JSONObject>> call(String s) throws Exception {
                    List<Tuple2<String, JSONObject>> list = new ArrayList<>();
                    if (StringUtils.isNotBlank(s)) {
                        String[] strArr = s.split("\\|", -1);
                        if (strArr.length >= 4) {
                            String keyStr = strArr[0] + "_" + strArr[1] + "_" + strArr[2] + "_" + strArr[3];
                            JSONObject jsonObject = new JSONObject();
                            jsonObject.put("keystr", keyStr);
                            jsonObject.put("time", strArr[0]);
                            jsonObject.put("type1", strArr[1]);
                            jsonObject.put("type2", strArr[2]);
                            jsonObject.put("type3", strArr[3]);
                            jsonObject.put("count", 1L);
                            jsonObject.put("timestamp", DATE_FORMAT_MS.format(new Date()));
                            Tuple2<String, JSONObject> tuple2 = new Tuple2<>(keyStr, jsonObject);
                            list.add(tuple2);
                        }
                    }
                    return list.iterator();
                }
            }).reduceByKey(new Function2<JSONObject, JSONObject, JSONObject>() {
                @Override
                public JSONObject call(JSONObject v1, JSONObject v2) throws Exception {
                    v1.put("count", v1.getLongValue("count") + v2.getLongValue("count"));
                    return v1;
                }
            }).map(new Function<Tuple2<String, JSONObject>, String>() {
                @Override
                public String call(Tuple2<String, JSONObject> v1) throws Exception {
                    return v1._2.toJSONString();
                }
            });
    
            dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    String source = "index-example/exampletype";
                    JavaEsSpark.saveToEs(rdd, source, ImmutableMap.of("es.mapping.id", "keystr"));
                }
            });
        }
    
    1. 生成能表示某条记录的唯一ID号
    2. 写ES时指定ID保证不重复

    3. 测试结果

    输入:
    20190708010000|a1|b1|c1
    20190708010000|a1|b2|c2
    20190708010000|a1|b1|c1
    20190708010000|a2|b2|c3
    20190708010000|a3|b3|c2
    20190708020000|a1|b2|c3
    20190708020000|a2|b2|c2
    20190708020000|a2|b2|c1
    20190708020000|a1|b1|c1
    20190708020000|a1|b1|c1
    20190708020000|a2|b2|c2

    输出:

    同样的数据统计两次,timestamp变了,但是统计结果没有改变。

    相关文章

      网友评论

          本文标题:Spark写ES重复问题优化

          本文链接:https://www.haomeiwen.com/subject/yfjtkctx.html