Kafka 单机环境搭建
从官网下载kafka_2.11-0.9.0.1和kafka_2.11-0.10.2.0两个版本, 这两个版本升级较大。而且对于Spark,有两个不同的jar包依赖,所以需要在测试环境中准备这两个环境。
配置
对于单机版本,关键配置zookeeper的路径即可:
#server.properties
# A comma seperated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=/home/hzlishuming/env/kafka_2.11-0.9.0.1/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
zookeeper.connect=localhost:2181/lsm-kafka-090
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
启动
./bin/kafka-server-start.sh -daemon config/server.properties
测试
producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181/lsm-kafka-090 --topic test --from-beginning
Kafka0.9.0.1 + Spark Streaming测试测试
版本
参考https://spark.apache.org/docs/latest/streaming-kafka-integration.html
,Broker0.9.0.1基于spark-streaming-kafka-0-8
即可;对于Broker0.10.0+需要选择spark-streaming-kafka-0-10
。
基于./dev/make-distribution.sh --name hadoop2.7-jdk7 --mvn ${MAVEN_HOME}/bin/mvn --tgz -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn
命令生成的jar包中,没有包含spark-streaming-kafka-0-8-assembly_*的kafka-streaming依赖包,需要从编译环境的external/kafka-0-8-assembly/target中copy到SPARK_HOME中。
Reciver Dstreamer 和 Direct Dsteam的区别?
执行命令如下:
# 启动kafka producer
producer
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 提交spark-streaming任务
./bin/spark-submit \
--class org.apache.spark.examples.streaming.KafkaWordCount \
--jars spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar \
./examples/jars/spark-examples_2.11-2.1.1.jar \
hadoop691.lt.163.org:2181/lsm-kafka-090 my-consumer-group test 1
结果
17/06/22 14:44:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.120.219.35, 61559, None)
17/06/22 14:44:56 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
17/06/22 14:44:57 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 3 write ahead log files from hdfs://hz-test-01/user/hive/checkpoint/receivedBlockMetadata
17/06/22 14:44:57 INFO SparkContext: Starting job: start at KafkaWordCount.scala:63
17/06/22 14:44:57 INFO DAGScheduler: Registering RDD 1 (start at KafkaWordCount.scala:63)
17/06/22 14:44:57 INFO DAGScheduler: Got job 0 (start at KafkaWordCount.scala:63) with 20 output partitions
17/06/22 14:44:57 INFO DAGScheduler: Final stage: ResultStage 1 (start at KafkaWordCount.scala:63)
17/06/22 14:44:57 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
17/06/22 14:44:57 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
17/06/22 14:44:57 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at start at KafkaWordCount.scala:63), which has no missing parents
17/06/22 14:44:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.1 KB, free 2004.6 MB)17/06/22 14:44:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1987.0 B, free 2004.6 MB)
17/06/22 14:44:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.120.219.35:61559 (size: 1987.0 B, free: 2004.6 MB)
17/06/22 14:44:57 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/06/22 14:44:57 INFO DAGScheduler: Submitting 50 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at start at KafkaWordCount.scala:63)
17/06/22 14:44:57 INFO YarnScheduler: Adding task set 0.0 with 50 tasks
Got job 0 (start at KafkaWordCount.scala:63) with 20 output partitions
为何有20个partitioins?- FileBasedWriteAheadLog_ReceivedBlockTracker是什么鬼?
Adding task set 0.0 with 50 tasks
为何有50个task?
最终执行结果:
17/06/22 14:45:38 INFO YarnScheduler: Removed TaskSet 163.0, whose tasks have all completed, from pool
17/06/22 14:45:38 INFO DAGScheduler: ResultStage 163 (print at KafkaWordCount.scala:61) finished in 0.042 s
17/06/22 14:45:38 INFO DAGScheduler: Job 42 finished: print at KafkaWordCount.scala:61, took 0.062163 s
-------------------------------------------
Time: 1498113938000 ms
-------------------------------------------
(b,2)
(a,2)
(c,2)
kafka0.10.2.0 + Spark Struct Streaming测试
版本
Spark2.x开始支持kafka0.10.x,不过需要依赖不同的jar包,这里要注意以下两个jar的区别:
spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar: spark-streaming依赖kafka的Inputstream/RDD等;
spark-sql-kafka-0-10_2.11-2.1.1.jar: 该jar包为spark-struct-streaming所依赖jar包;
如果仅仅测试spark-streaming在kafka2.10.x环境下的情况,只需要使用spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar
即可;
如果需要测试spark-struct-streaming,则上述两个jar包均需要包含;
执行命令
./bin/spark-submit \
--class org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount \
--jars spark-sql-kafka-0-10_2.11-2.1.1.jar,spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10.120.219.35:9092 subscribe test
结果
17/06/22 17:44:29 INFO AppInfoParser: Kafka version : 0.10.0.1
17/06/22 17:44:29 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set key.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value:
17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set value.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value:
17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set auto.offset.reset to none, earlier value:
17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set group.id to spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-executor, earlier value:
17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set enable.auto.commit to false, earlier value:
17/06/22 17:44:29 INFO KafkaSourceProvider: executor: Set receive.buffer.bytes to 65536
17/06/22 17:44:29 INFO StreamExecution: Starting new streaming query.
17/06/22 17:44:30 INFO AbstractCoordinator: Discovered coordinator 10.120.219.35:9092 (id: 2147483647 rack: null) for group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0.
17/06/22 17:44:30 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0
17/06/22 17:44:30 INFO AbstractCoordinator: (Re-)joining group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0
17/06/22 17:44:30 INFO AbstractCoordinator: Successfully joined group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0 with generation 1
17/06/22 17:44:30 INFO ConsumerCoordinator: Setting newly assigned partitions [test-0] for group spark-kafka-source-b515e7df-5bf2-4f21-83ab-7ca1e54fde99--4866713-driver-0
17/06/22 17:44:30 INFO KafkaSource: Initial offsets: {"test":{"0":21}}
17/06/22 17:44:30 INFO StreamExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1498124670435)
17/06/22 17:44:30 INFO KafkaSource: GetBatch called with start = None, end = {"test":{"0":21}}
17/06/22 17:44:30 INFO KafkaSource: Partitions added: Map()
17/06/22 17:44:30 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(test-0,21,21,None)
网友评论
我使用了spark-streaming-kafka-0-10_2.11-2.1.0 这个jar包,
然后应用在提交yarn的时候报了org/apache/spark/streaming/kafka/KafkaUtils错误
我查看example的源代码发现,其引用的是org.apache.spark.streaming.kafka
而jar包里面定义的org.apache.spark.streaming.kafka010
不知道博主有没有遇到这个问题?