摘要: Spark Streaming
, Kafka
,zookeeper
Scala调用Kafka API发送数据
使用Scala调用Kafka API向Kafka发送数据,设置topic,key/value的序列化方式,创建一个KafkaProducer
对象和多个ProducerRecord
不断调用send
方法。
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
object Producer {
val topic = "test_gp"
val brokers = "cloudera01:9092"
val rnd = new Random()
val prop = new Properties() {
{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
}
}
def main(args: Array[String]): Unit = {
val producer = new KafkaProducer[String, String](prop)
// 模拟数据
val nameAddres = Map("bob" -> "shanghai#200000", "amy" -> "beijing#100000", "alice" -> "shanghai#200000", "tom" -> "beijing#100000", "lulu" -> "hangzhou#310000",
"nick" -> "shanghai#200000")
val namePhones = Map("bob" -> "15700079421", "amy" -> "18700079458", "alice" -> "17730079427", "tom" -> "16700379451", "lulu" -> "18800074423",
"nick" -> "14400033426")
for (nameAddre <- nameAddres) {
val data = new ProducerRecord[String, String](topic, nameAddre._1, s"${nameAddre._1}\t${nameAddre._2}\t0")
producer.send(data, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (null != e) e.printStackTrace()
}
})
if (rnd.nextInt(100) < 50) Thread.sleep(rnd.nextInt(10))
}
for (namePhone <- namePhones) {
val data = new ProducerRecord[String, String](topic, namePhone._1, s"${namePhone._1}\t${namePhone._2}\t0")
producer.send(data, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (null != e) e.printStackTrace()
}
})
if (rnd.nextInt(100) < 50) Thread.sleep(rnd.nextInt(10))
}
producer.close()
}
}
使用kafka客户端查看接受到的数据
test_gp有5个主题,2个备份,分布在3台机器上
[root@cloudera01 ~]# kafka-topics --describe --zookeeper cloudera01:2181/kafka --topic test_gp
Topic:test_gp PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test_gp Partition: 0 Leader: 79 Replicas: 79,78 Isr: 78,79
Topic: test_gp Partition: 1 Leader: 77 Replicas: 77,79 Isr: 77,79
Topic: test_gp Partition: 2 Leader: 78 Replicas: 78,77 Isr: 78,77
Topic: test_gp Partition: 3 Leader: 79 Replicas: 79,77 Isr: 77,79
Topic: test_gp Partition: 4 Leader: 77 Replicas: 77,78 Isr: 78,77
[root@cloudera01 ~]# kafka-console-consumer --bootstrap-server cloudera01:9092 --topic test_gp
nick shanghai#200000 0
lulu hangzhou#310000 0
bob shanghai#200000 0
tom beijing#100000 0
amy beijing#100000 0
nick 14400033426 1
lulu 18800074423 1
bob 15700079421 1
tom 16700379451 1
amy 18700079458 1
alice shanghai#200000 0
alice 17730079427 1
Spark Streaming消费Kafka数据并写入Kafka
从kafka中抽取数据进行消费,将原始数据解析成json格式写入一个新的kafka,test_gp2,通过客户端查看test_gp2有两个分区
[root@cloudera01 ~]# kafka-topics --zookeeper cloudera01:2181/kafka --describe --topic test_gp2
Topic:test_gp2 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test_gp2 Partition: 0 Leader: 78 Replicas: 78,79,77 Isr: 78,77,79
Topic: test_gp2 Partition: 1 Leader: 79 Replicas: 79,77,78 Isr: 78,77,79
开发步骤:
-
SparkConf
创建spark配置 -
StreamingContext
创建spark streaming上下文 - 创建Kafka配置参数
kafkaParams: Map[String, Object]
- 创建其他配置并且使用ssc上下文广播
- 通过
KafkaUtils.createDirectStream
获取kafkaDirectStream
DStream对象 -
map
转换操作将String转化为JSONObject -
foreachRDD
输出,rdd调用foreach
输出每条消息到kafka,在fearch内部调用KafkaProducer单例对象
,如果KafkaProducer异步发送消息成功将offset
写入zookeeper
先创建一个KafkaUtilities脚本构建KafkaProducer单例对象,否则每个partition中每条记录
都需要新建一个KafkaProducer增加了不必要的开销。
import java.util.Properties
import org.apache.kafka.clients.producer.{ Producer, ProducerRecord, KafkaProducer }
import org.apache.kafka.clients.producer.Callback
object KafkaUtilities {
private var producer: Producer[String, String] = null
def getProducer(bootstrap: String): Producer[String, String] = {
this.synchronized {
if (producer == null) {
producer = createKafkaProducer(bootstrap)
}
producer
}
}
def createKafkaProducer(bootstrap: String): Producer[String, String] = {
val props = new Properties()
props.put("max.request.size", "50000000")
props.put("buffer.memory", "50000000")
props.put("timeout.ms", "3000000")
props.put("request.timeout.ms", "30000000")
props.put("socket.request.max.bytes", "60000000")
props.put("message.max.bytes", "60000000")
props.put("replica.fetch.max.bytes", "100000000")
props.put("fetch.message.max.bytes", "100000000")
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
props.put("max.in.flight.requests.per.connection", "1")
props.put("retries", "3")
props.put("bootstrap.servers", bootstrap)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("max.request.size", "50000000")
props.put("buffer.memory", "50000000")
props.put("timeout.ms", "3000000")
props.put("request.timeout.ms", "30000000")
props.put("socket.request.max.bytes", "60000000")
props.put("message.max.bytes", "60000000")
props.put("replica.fetch.max.bytes", "100000000")
props.put("fetch.message.max.bytes", "100000000")
new KafkaProducer[String, String](props)
}
/**
* 异步发送消息到kafka
* @param kafkaProducer
* @param topic
* @param message
* @param key
* @param partition
* @param callback
* @return
*/
def sendKafka(
kafkaProducer: Producer[String, String],
topic: String,
message: String,
key: String,
partition: Integer,
callback: Callback) = {
if (callback != null && callback.isInstanceOf[Callback]) {
kafkaProducer.send(new ProducerRecord[String, String](topic, partition, key, message), callback)
} else {
kafkaProducer.send(new ProducerRecord[String, String](topic, partition, key, message))
}
}
}
在send方法中手动指定了key
和partition
,如果一个有效的partition属性数值被指定,那么在发送记录时partition属性数值就会被应用发到指定的分区编号
。如果没有partition属性数值被指定,而一个key属性被声明的话,一个partition会通过key的hash而被选中发送到对应的partition
。如果既没有key也没有partition属性数值被声明,那么一个partition将会被分配以轮询的方式
。
创建工厂模式zookeeper方法,实现往zookeeper写入数据
import org.apache.zookeeper._
import org.apache.zookeeper.common._
import java.util.Properties
import org.apache.zookeeper.data.Stat
class ZKUtils() extends Watcher {
private var zk: ZooKeeper = _
private var pro: Properties = _
private def this(pro: Properties) = {
this()
this.pro = pro
this.zk = new ZooKeeper(pro.getProperty("ZK_HOST"), 20000, this)
}
@Override
def process(event: WatchedEvent) {
//no watching
}
private def createPath(path: String, zk: ZooKeeper): Unit = {
//创建路径
PathUtils.validatePath(path)
var parentPath = path.substring(0, path.indexOf("/", 0) + 1)
while (parentPath != null && !parentPath.isEmpty) {
if (zk.exists(parentPath, false) == null) {
// 同步创建,acl权限完全开放,节点类型持久化节点
zk.create(parentPath, "".getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 如果根路径和全路径一致停止创建
if (parentPath.equals(path)) {
parentPath = null
} else {
// 寻找下一个/作为新根路径
var idx = path.indexOf("/", parentPath.length() + 1)
if (-1 == idx) {
idx = path.length
}
parentPath = path.substring(0, idx)
}
}
}
def setData(path: String, data: String): Stat = {
if (zk.exists(path, false) == null) {
createPath(path, zk)
}
zk.setData(path, data.getBytes, -1)
}
def getData(path: String): String = {
val stat = zk.exists(path, false)
if (stat == null) {
null
} else {
new String(zk.getData(path, false, stat))
}
}
}
object ZKUtils {
private var ZKUtils: ZKUtils = _
def getIns(prop: Properties): ZKUtils = {
this.synchronized {
if (ZKUtils == null) {
ZKUtils = new ZKUtils(prop)
}
ZKUtils
}
}
}
消费test_gp消息,每条消息调用KafkaProducer单例对象写入test_gp2
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaOperation {
Logger.getRootLogger.setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
// spark配置
val sparkConf = new SparkConf()
.setAppName("KafkaOperation")
.setMaster("local[*]")
.set("spark.streaming.kafka.maxRatePerPartition", "100") // 每秒对一个topic下每个partition拉取的最大数据量
// 这个参数需要和batchDuration一起调节做到拉取和处理数据平衡
// 默认没有上限,在一个batchDuration下kafka有多少数据就拉取多少数据,由于batchDuration时间不能动态调节,所以当数据流入量大了可能会堵塞
// 创建上下文
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 其他配置广播
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "cloudera01:9092")
properties.setProperty("kafka.topic.save", "test_gp2")
properties.setProperty("kafka.topic.input", "test_gp")
properties.setProperty("group.id", "kafka_operation_group")
properties.setProperty("ZK_HOST", "cloudera01,cloudera02,cloudera03")
val configProperties = ssc.sparkContext.broadcast(properties)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "cloudera01:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "kafka_operation_group",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
// 获取kafka DStream
val kafkaDirectStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent, // 在executor上均匀分布分区
Subscribe[String, String](List("test_gp"), kafkaParams)) // 订阅主题集合,kafka参数
val saveRDD = kafkaDirectStream.map(x => {
// 添加partition和offset到数据
val jsonObject = new JSONObject() {
{
put("partition", x.partition().toString)
put("offset", x.partition().toString)
}
}
val Array(name, tmp, label) = x.value().split("\t")
jsonObject.put("name", name)
if (label.toInt == 0) jsonObject.put("addr", tmp) else jsonObject.put("phone", tmp)
jsonObject
})
// 写入kafka
saveRDD.foreachRDD(r => {
if (r != null && !r.isEmpty()) {
r.foreach(x => {
val name = x.getString("name")
try {
KafkaUtilities.sendKafka(
KafkaUtilities.getProducer(configProperties.value.getProperty("bootstrap.servers")), // KafkaProducer单例
configProperties.value.getProperty("kafka.topic.save"),
x.toJSONString,
name,
new scala.util.Random().nextInt(2),
new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (recordMetadata != null && e == null) {
//保存偏移量到 zookeeper
ZKUtils.getIns(configProperties.value).setData(
"/test/monitor/" + configProperties.value.getProperty("kafka.topic.input") + "/" +
configProperties.value.getProperty("group.id") + "/" +
x.getString("partition"),
x.getString("offset"))
} else {
println("异步发送消息失败", x.toJSONString)
e.printStackTrace()
}
}
}
)
}
})
}
})
// 开始运算
ssc.start()
// 等待被打断停止
ssc.awaitTermination()
}
}
查看kafka收到的消息
[root@cloudera01 ~]# kafka-console-consumer --bootstrap-server cloudera01:9092 --topic test_gp2
{"partition":"2","offset":"108","phone":"14400033426","name":"nick"}
{"partition":"2","offset":"106","name":"nick","addr":"shanghai#200000"}
{"partition":"2","offset":"107","name":"lulu","addr":"hangzhou#310000"}
{"partition":"2","offset":"109","phone":"18800074423","name":"lulu"}
{"partition":"1","offset":"107","name":"amy","addr":"beijing#100000"}
{"partition":"3","offset":"53","name":"bob","addr":"shanghai#200000"}
{"partition":"3","offset":"54","phone":"15700079421","name":"bob"}
{"partition":"1","offset":"106","name":"tom","addr":"beijing#100000"}
{"partition":"1","offset":"108","phone":"16700379451","name":"tom"}
{"partition":"1","offset":"109","phone":"18700079458","name":"amy"}
{"partition":"4","offset":"53","name":"alice","addr":"shanghai#200000"}
{"partition":"4","offset":"54","phone":"17730079427","name":"alice"}
查看zookeeper存储的offset信息
[zk: localhost:2181(CONNECTED) 30] ls /test/monitor/test_gp/kafka_operation_group
[1, 2, 3, 4]
[zk: localhost:2181(CONNECTED) 18] get /test/monitor/test_gp/kafka_operation_group/1
109
cZxid = 0x3d000877d5
ctime = Wed Sep 30 14:40:46 CST 2020
mZxid = 0x3d00087fb1
mtime = Wed Sep 30 15:32:18 CST 2020
pZxid = 0x3d000877d5
cversion = 0
dataVersion = 38
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
网友评论