美文网首页
Spark写ES重复问题优化

Spark写ES重复问题优化

作者: Jorvi | 来源:发表于2019-07-12 13:55 被阅读0次

1. 总体思路

在写ES的时候指定ID,一个ID对应唯一一条记录。

2. 代码

    public void run() {
        SparkConf conf = new SparkConf().setAppName("SparkStreaming_" + appName)
              .set("es.nodes", "11.11.11.11")
              .set("es.port", "9200")
              .set("es.input.json", "yes")
              .set("es.write.operation", "index");
        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.minutes(1));

        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "exampleGroup");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("exampleTopic");

        final JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
                LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

        JavaDStream<String> dStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
            @Override
            public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
                JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> v1) throws Exception {
                        return v1.value();
                    }
                });
                return tempRdd;
            }
        }).filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {
                return StringUtils.isNotBlank(v1);
            }
        }).flatMapToPair(new PairFlatMapFunction<String, String, JSONObject>() {
            @Override
            public Iterator<Tuple2<String, JSONObject>> call(String s) throws Exception {
                List<Tuple2<String, JSONObject>> list = new ArrayList<>();
                if (StringUtils.isNotBlank(s)) {
                    String[] strArr = s.split("\\|", -1);
                    if (strArr.length >= 4) {
                        String keyStr = strArr[0] + "_" + strArr[1] + "_" + strArr[2] + "_" + strArr[3];
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("keystr", keyStr);
                        jsonObject.put("time", strArr[0]);
                        jsonObject.put("type1", strArr[1]);
                        jsonObject.put("type2", strArr[2]);
                        jsonObject.put("type3", strArr[3]);
                        jsonObject.put("count", 1L);
                        jsonObject.put("timestamp", DATE_FORMAT_MS.format(new Date()));
                        Tuple2<String, JSONObject> tuple2 = new Tuple2<>(keyStr, jsonObject);
                        list.add(tuple2);
                    }
                }
                return list.iterator();
            }
        }).reduceByKey(new Function2<JSONObject, JSONObject, JSONObject>() {
            @Override
            public JSONObject call(JSONObject v1, JSONObject v2) throws Exception {
                v1.put("count", v1.getLongValue("count") + v2.getLongValue("count"));
                return v1;
            }
        }).map(new Function<Tuple2<String, JSONObject>, String>() {
            @Override
            public String call(Tuple2<String, JSONObject> v1) throws Exception {
                return v1._2.toJSONString();
            }
        });

        dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> rdd) throws Exception {
                String source = "index-example/exampletype";
                JavaEsSpark.saveToEs(rdd, source, ImmutableMap.of("es.mapping.id", "keystr"));
            }
        });
    }
  1. 生成能表示某条记录的唯一ID号
  2. 写ES时指定ID保证不重复

3. 测试结果

输入:
20190708010000|a1|b1|c1
20190708010000|a1|b2|c2
20190708010000|a1|b1|c1
20190708010000|a2|b2|c3
20190708010000|a3|b3|c2
20190708020000|a1|b2|c3
20190708020000|a2|b2|c2
20190708020000|a2|b2|c1
20190708020000|a1|b1|c1
20190708020000|a1|b1|c1
20190708020000|a2|b2|c2

输出:

同样的数据统计两次,timestamp变了,但是统计结果没有改变。

相关文章

网友评论

      本文标题:Spark写ES重复问题优化

      本文链接:https://www.haomeiwen.com/subject/yfjtkctx.html