美文网首页Apache Kafka
spark streaming + kafka

spark streaming + kafka

作者: Jerry_Hao | 来源:发表于2018-01-03 10:37 被阅读4次

    spark streaming + kafka

    官网下载spark spark-2.0.1-bin-hadoop2.7

    java代码测试

    package douzi.risk;
    
    import java.util.HashMap;
    import java.util.ArrayList;
    import java.util.Map;
    
    import com.google.common.collect.Lists;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.ConsumerStrategy;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.Durations;
    
    public class MainStreaming {
    
        public static void main(String[] args) throws Exception {
            // Create context with a 10 seconds batch interval
            SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
    
            ArrayList<String> topicsSet = Lists.newArrayList("test");
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", "localhost:9092");
            kafkaParams.put("group.id", "test_kafka_spark_stream_1");
            kafkaParams.put("auto.offset.reset", "earliest");
            kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
            // Create direct kafka stream with brokers and topics
            ConsumerStrategy<String, String> cs = ConsumerStrategies.Subscribe(topicsSet, kafkaParams);
            JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    cs
            );
    
            JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> tuple2) {
                    return tuple2.value();
                }
            });
    
            JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
                    new PairFunction<String, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(String s) {
                            return new Tuple2<>(s, 1);
                        }
                    }).reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer i1, Integer i2) {
                            return i1 + i2;
                        }
                    });
            wordCounts.print();
    
            // Start the computation
            jssc.start();
            jssc.awaitTermination();
        }
    }
    

    spark deploy

    运行spark-submit命令行提交jar包 bin/spark-submit --class douzi.risk.MainStreaming /Users/Dev/IdeaProjects/kafka-spark-streaming/target/risk-1.0-SNAPSHOT.jar

    相关文章

      网友评论

        本文标题:spark streaming + kafka

        本文链接:https://www.haomeiwen.com/subject/uefhnxtx.html