美文网首页一步一步学习Spark
Spark Streaming和Spark Struct Str

Spark Streaming和Spark Struct Str

作者: 分裂四人组 | 来源:发表于2017-06-26 19:47 被阅读309次

    Kafka 单机环境搭建

    从官网下载kafka_2.11-0.9.0.1和kafka_2.11-0.10.2.0两个版本, 这两个版本升级较大。而且对于Spark,有两个不同的jar包依赖,所以需要在测试环境中准备这两个环境。

    配置
    对于单机版本,关键配置zookeeper的路径即可:

    #server.properties
    
    # A comma seperated list of directories under which to store log files
    #log.dirs=/tmp/kafka-logs
    log.dirs=/home/hzlishuming/env/kafka_2.11-0.9.0.1/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=2
    
    zookeeper.connect=localhost:2181/lsm-kafka-090
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000  
    

    启动

    ./bin/kafka-server-start.sh -daemon config/server.properties

    测试
    producer

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    consumer

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181/lsm-kafka-090 --topic test --from-beginning

    Kafka0.9.0.1 + Spark Streaming测试测试

    版本
    参考https://spark.apache.org/docs/latest/streaming-kafka-integration.html,Broker0.9.0.1基于spark-streaming-kafka-0-8即可;对于Broker0.10.0+需要选择spark-streaming-kafka-0-10

    基于./dev/make-distribution.sh --name hadoop2.7-jdk7 --mvn ${MAVEN_HOME}/bin/mvn --tgz -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn命令生成的jar包中,没有包含spark-streaming-kafka-0-8-assembly_*的kafka-streaming依赖包,需要从编译环境的external/kafka-0-8-assembly/target中copy到SPARK_HOME中。

    Reciver Dstreamer 和 Direct Dsteam的区别?

    执行命令如下:

    # 启动kafka producer
    producer
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    # 提交spark-streaming任务
    ./bin/spark-submit \
            --class org.apache.spark.examples.streaming.KafkaWordCount \
            --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar \
            ./examples/jars/spark-examples_2.11-2.1.1.jar \
            hadoop691.lt.163.org:2181/lsm-kafka-090 my-consumer-group test 1
    

    结果

    17/06/22 14:44:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.120.219.35, 61559, None)
    17/06/22 14:44:56 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
    17/06/22 14:44:57 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 3 write ahead log files from hdfs://hz-test-01/user/hive/checkpoint/receivedBlockMetadata
    17/06/22 14:44:57 INFO SparkContext: Starting job: start at KafkaWordCount.scala:63
    17/06/22 14:44:57 INFO DAGScheduler: Registering RDD 1 (start at KafkaWordCount.scala:63)
    17/06/22 14:44:57 INFO DAGScheduler: Got job 0 (start at KafkaWordCount.scala:63) with 20 output partitions
    17/06/22 14:44:57 INFO DAGScheduler: Final stage: ResultStage 1 (start at KafkaWordCount.scala:63)
    17/06/22 14:44:57 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
    17/06/22 14:44:57 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
    17/06/22 14:44:57 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at start at KafkaWordCount.scala:63), which has no missing parents
    17/06/22 14:44:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.1 KB, free 2004.6 MB)17/06/22 14:44:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1987.0 B, free 2004.6 MB)
    17/06/22 14:44:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.120.219.35:61559 (size: 1987.0 B, free: 2004.6 MB)
    17/06/22 14:44:57 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
    17/06/22 14:44:57 INFO DAGScheduler: Submitting 50 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at start at KafkaWordCount.scala:63)
    17/06/22 14:44:57 INFO YarnScheduler: Adding task set 0.0 with 50 tasks
    
    1. Got job 0 (start at KafkaWordCount.scala:63) with 20 output partitions为何有20个partitioins?
    2. FileBasedWriteAheadLog_ReceivedBlockTracker是什么鬼?
    3. Adding task set 0.0 with 50 tasks为何有50个task?

    最终执行结果:

    17/06/22 14:45:38 INFO YarnScheduler: Removed TaskSet 163.0, whose tasks have all completed, from pool
    17/06/22 14:45:38 INFO DAGScheduler: ResultStage 163 (print at KafkaWordCount.scala:61) finished in 0.042 s
    17/06/22 14:45:38 INFO DAGScheduler: Job 42 finished: print at KafkaWordCount.scala:61, took 0.062163 s
    -------------------------------------------
    Time: 1498113938000 ms
    -------------------------------------------
    (b,2)
    (a,2)
    (c,2)
    

    kafka0.10.2.0 + Spark Struct Streaming测试

    版本
    Spark2.x开始支持kafka0.10.x,不过需要依赖不同的jar包,这里要注意以下两个jar的区别:

    spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar: spark-streaming依赖kafka的Inputstream/RDD等;
    spark-sql-kafka-0-10_2.11-2.1.1.jar: 该jar包为spark-struct-streaming所依赖jar包;

    如果仅仅测试spark-streaming在kafka2.10.x环境下的情况,只需要使用spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar即可;
    如果需要测试spark-struct-streaming,则上述两个jar包均需要包含;

    执行命令

    ./bin/spark-submit \
            --class org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount \
            --jars spark-sql-kafka-0-10_2.11-2.1.1.jar,spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar \
            ./examples/jars/spark-examples_2.11-2.1.1.jar \
            10.120.219.35:9092 subscribe test
    

    结果

    17/06/22 17:44:29 INFO AppInfoParser: Kafka version : 0.10.0.1
    17/06/22 17:44:29 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
    17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set key.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value:
    17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set value.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value:
    17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set auto.offset.reset to none, earlier value:
    17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set group.id to spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-executor, earlier value:
    17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set enable.auto.commit to false, earlier value:
    17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set receive.buffer.bytes to 65536
    17/06/22 17:44:29 INFO StreamExecution: Starting new streaming query.
    17/06/22 17:44:30 INFO AbstractCoordinator: Discovered coordinator 10.120.219.35:9092 (id: 2147483647 rack: null) for group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0.
    17/06/22 17:44:30 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0
    17/06/22 17:44:30 INFO AbstractCoordinator: (Re-)joining group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0
    17/06/22 17:44:30 INFO AbstractCoordinator: Successfully joined group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0 with generation 1
    17/06/22 17:44:30 INFO ConsumerCoordinator: Setting newly assigned partitions [test-0] for group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0
    17/06/22 17:44:30 INFO KafkaSource: Initial offsets: {"test":{"0":21}}
    17/06/22 17:44:30 INFO StreamExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1498124670435)
    17/06/22 17:44:30 INFO KafkaSource: GetBatch called with start = None, end = {"test":{"0":21}}
    17/06/22 17:44:30 INFO KafkaSource: Partitions added: Map()
    17/06/22 17:44:30 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(test-0,21,21,None)
    

    相关文章

      网友评论

      • 专业补刀:博主好,我最近在调试CDH版本的kafka0.10 + spark2.1.0

        我使用了spark-streaming-kafka-0-10_2.11-2.1.0 这个jar包,
        然后应用在提交yarn的时候报了org/apache/spark/streaming/kafka/KafkaUtils错误

        我查看example的源代码发现,其引用的是org.apache.spark.streaming.kafka
        而jar包里面定义的org.apache.spark.streaming.kafka010

        不知道博主有没有遇到这个问题?
        分裂四人组:@专业补刀 好久没登录了,这个应该是需要引用spark-streaming-kafka的包,我在pom.xml里10跟8 的版本都加进去了,所以应该没有这个问题

      本文标题:Spark Streaming和Spark Struct Str

      本文链接:https://www.haomeiwen.com/subject/gaijcxtx.html