美文网首页Scala学习大数据学习
Spark Streaming读写Kafka,将offset写入

Spark Streaming读写Kafka,将offset写入

作者: xiaogp | 来源:发表于2020-09-29 14:40 被阅读0次

摘要: Spark StreamingKafkazookeeper

Scala调用Kafka API发送数据

使用Scala调用Kafka API向Kafka发送数据,设置topic,key/value的序列化方式,创建一个KafkaProducer对象和多个ProducerRecord不断调用send方法。

import java.util.Properties

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

object Producer {
  val topic = "test_gp"
  val brokers = "cloudera01:9092"
  val rnd = new Random()
  val prop = new Properties() {
    {
      put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
      put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    }
  }

  def main(args: Array[String]): Unit = {
    val producer = new KafkaProducer[String, String](prop)
    // 模拟数据
    val nameAddres = Map("bob" -> "shanghai#200000", "amy" -> "beijing#100000", "alice" -> "shanghai#200000", "tom" -> "beijing#100000", "lulu" -> "hangzhou#310000",
      "nick" -> "shanghai#200000")
    val namePhones = Map("bob" -> "15700079421", "amy" -> "18700079458", "alice" -> "17730079427", "tom" -> "16700379451", "lulu" -> "18800074423",
      "nick" -> "14400033426")

    for (nameAddre <- nameAddres) {
      val data = new ProducerRecord[String, String](topic, nameAddre._1, s"${nameAddre._1}\t${nameAddre._2}\t0")
      producer.send(data, new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          if (null != e) e.printStackTrace()
        }
      })
      if (rnd.nextInt(100) < 50) Thread.sleep(rnd.nextInt(10))
    }

    for (namePhone <- namePhones) {
      val data = new ProducerRecord[String, String](topic, namePhone._1, s"${namePhone._1}\t${namePhone._2}\t0")
      producer.send(data, new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          if (null != e) e.printStackTrace()
        }
      })
      if (rnd.nextInt(100) < 50) Thread.sleep(rnd.nextInt(10))
    }

    producer.close()
  }
}

使用kafka客户端查看接受到的数据
test_gp有5个主题,2个备份,分布在3台机器上

[root@cloudera01 ~]# kafka-topics --describe --zookeeper cloudera01:2181/kafka --topic test_gp
Topic:test_gp   PartitionCount:5    ReplicationFactor:2 Configs:
    Topic: test_gp  Partition: 0    Leader: 79  Replicas: 79,78 Isr: 78,79
    Topic: test_gp  Partition: 1    Leader: 77  Replicas: 77,79 Isr: 77,79
    Topic: test_gp  Partition: 2    Leader: 78  Replicas: 78,77 Isr: 78,77
    Topic: test_gp  Partition: 3    Leader: 79  Replicas: 79,77 Isr: 77,79
    Topic: test_gp  Partition: 4    Leader: 77  Replicas: 77,78 Isr: 78,77
[root@cloudera01 ~]# kafka-console-consumer --bootstrap-server cloudera01:9092 --topic test_gp
nick    shanghai#200000 0
lulu    hangzhou#310000 0
bob shanghai#200000 0
tom beijing#100000  0
amy beijing#100000  0
nick    14400033426 1
lulu    18800074423 1
bob 15700079421 1
tom 16700379451 1
amy 18700079458 1
alice   shanghai#200000 0
alice   17730079427 1

Spark Streaming消费Kafka数据并写入Kafka

从kafka中抽取数据进行消费,将原始数据解析成json格式写入一个新的kafka,test_gp2,通过客户端查看test_gp2有两个分区

[root@cloudera01 ~]# kafka-topics --zookeeper cloudera01:2181/kafka --describe --topic test_gp2
Topic:test_gp2  PartitionCount:2    ReplicationFactor:3 Configs:
    Topic: test_gp2 Partition: 0    Leader: 78  Replicas: 78,79,77  Isr: 78,77,79
    Topic: test_gp2 Partition: 1    Leader: 79  Replicas: 79,77,78  Isr: 78,77,79

开发步骤:

  • SparkConf创建spark配置
  • StreamingContext创建spark streaming上下文
  • 创建Kafka配置参数kafkaParams: Map[String, Object]
  • 创建其他配置并且使用ssc上下文广播
  • 通过KafkaUtils.createDirectStream获取kafkaDirectStream DStream对象
  • map转换操作将String转化为JSONObject
  • foreachRDD输出,rdd调用foreach输出每条消息到kafka,在fearch内部调用KafkaProducer单例对象,如果KafkaProducer异步发送消息成功将offset写入zookeeper
    先创建一个KafkaUtilities脚本构建KafkaProducer单例对象,否则每个partition中每条记录都需要新建一个KafkaProducer增加了不必要的开销。
import java.util.Properties
import org.apache.kafka.clients.producer.{ Producer, ProducerRecord, KafkaProducer }
import org.apache.kafka.clients.producer.Callback

object KafkaUtilities {
  private var producer: Producer[String, String] = null

  def getProducer(bootstrap: String): Producer[String, String] = {
    this.synchronized {
      if (producer == null) {
        producer = createKafkaProducer(bootstrap)
      }
      producer
    }
  }

  def createKafkaProducer(bootstrap: String): Producer[String, String] = {
    val props = new Properties()
    props.put("max.request.size", "50000000")
    props.put("buffer.memory", "50000000")
    props.put("timeout.ms", "3000000")
    props.put("request.timeout.ms", "30000000")
    props.put("socket.request.max.bytes", "60000000")
    props.put("message.max.bytes", "60000000")
    props.put("replica.fetch.max.bytes", "100000000")
    props.put("fetch.message.max.bytes", "100000000")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("request.required.acks", "1")
    props.put("max.in.flight.requests.per.connection", "1")
    props.put("retries", "3")
    props.put("bootstrap.servers", bootstrap)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("max.request.size", "50000000")
    props.put("buffer.memory", "50000000")
    props.put("timeout.ms", "3000000")
    props.put("request.timeout.ms", "30000000")
    props.put("socket.request.max.bytes", "60000000")
    props.put("message.max.bytes", "60000000")
    props.put("replica.fetch.max.bytes", "100000000")
    props.put("fetch.message.max.bytes", "100000000")

    new KafkaProducer[String, String](props)
  }

  /**
   * 异步发送消息到kafka
   * @param kafkaProducer
   * @param topic
   * @param message
   * @param key
   * @param partition
   * @param callback
   * @return
   */
  def sendKafka(
         kafkaProducer: Producer[String, String],
         topic:         String,
         message:       String,
         key:           String,
         partition:     Integer,
         callback:      Callback) = {
    if (callback != null && callback.isInstanceOf[Callback]) {
      kafkaProducer.send(new ProducerRecord[String, String](topic, partition, key, message), callback)
    } else {
      kafkaProducer.send(new ProducerRecord[String, String](topic, partition, key, message))
    }
  }
}

在send方法中手动指定了keypartition,如果一个有效的partition属性数值被指定,那么在发送记录时partition属性数值就会被应用发到指定的分区编号。如果没有partition属性数值被指定,而一个key属性被声明的话,一个partition会通过key的hash而被选中发送到对应的partition。如果既没有key也没有partition属性数值被声明,那么一个partition将会被分配以轮询的方式


创建工厂模式zookeeper方法,实现往zookeeper写入数据


import org.apache.zookeeper._
import org.apache.zookeeper.common._
import java.util.Properties

import org.apache.zookeeper.data.Stat

class ZKUtils() extends Watcher {
  private var zk: ZooKeeper = _
  private var pro: Properties = _

  private def this(pro: Properties) = {
    this()
    this.pro = pro
    this.zk = new ZooKeeper(pro.getProperty("ZK_HOST"), 20000, this)
  }

  @Override
  def process(event: WatchedEvent) {
    //no watching
  }

  private def createPath(path: String, zk: ZooKeeper): Unit = {
    //创建路径
    PathUtils.validatePath(path)
    var parentPath = path.substring(0, path.indexOf("/", 0) + 1)
    while (parentPath != null && !parentPath.isEmpty) {
      if (zk.exists(parentPath, false) == null) {
        // 同步创建,acl权限完全开放,节点类型持久化节点
        zk.create(parentPath, "".getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      }

      // 如果根路径和全路径一致停止创建
      if (parentPath.equals(path)) {
        parentPath = null
      } else {
        // 寻找下一个/作为新根路径
        var idx = path.indexOf("/", parentPath.length() + 1)
        if (-1 == idx) {
          idx = path.length
        }
        parentPath = path.substring(0, idx)
      }
    }
  }

  def setData(path: String, data: String): Stat = {
    if (zk.exists(path, false) == null) {
      createPath(path, zk)
    }
    zk.setData(path, data.getBytes, -1)
  }

  def getData(path: String): String = {
    val stat = zk.exists(path, false)
    if (stat == null) {
      null
    } else {
      new String(zk.getData(path, false, stat))
    }
  }
}


object ZKUtils {
  private var ZKUtils: ZKUtils = _

  def getIns(prop: Properties): ZKUtils = {
    this.synchronized {
      if (ZKUtils == null) {
        ZKUtils = new ZKUtils(prop)
      }
      ZKUtils
    }
  }
}

消费test_gp消息,每条消息调用KafkaProducer单例对象写入test_gp2

import java.util.Properties

import org.apache.log4j.{Level, Logger}
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}


object KafkaOperation {
  Logger.getRootLogger.setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    // spark配置
    val sparkConf = new SparkConf()
      .setAppName("KafkaOperation")
      .setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "100") // 每秒对一个topic下每个partition拉取的最大数据量
    // 这个参数需要和batchDuration一起调节做到拉取和处理数据平衡
    // 默认没有上限,在一个batchDuration下kafka有多少数据就拉取多少数据,由于batchDuration时间不能动态调节,所以当数据流入量大了可能会堵塞

    // 创建上下文
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // 其他配置广播
    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "cloudera01:9092")
    properties.setProperty("kafka.topic.save", "test_gp2")
    properties.setProperty("kafka.topic.input", "test_gp")
    properties.setProperty("group.id", "kafka_operation_group")
    properties.setProperty("ZK_HOST", "cloudera01,cloudera02,cloudera03")
    val configProperties = ssc.sparkContext.broadcast(properties)

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "cloudera01:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> "kafka_operation_group",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )

    // 获取kafka DStream
    val kafkaDirectStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent, // 在executor上均匀分布分区
      Subscribe[String, String](List("test_gp"), kafkaParams)) // 订阅主题集合,kafka参数

    val saveRDD = kafkaDirectStream.map(x => {
      // 添加partition和offset到数据
      val jsonObject = new JSONObject() {
        {
          put("partition", x.partition().toString)
          put("offset", x.partition().toString)
        }
      }
      val Array(name, tmp, label) = x.value().split("\t")
      jsonObject.put("name", name)
      if (label.toInt == 0) jsonObject.put("addr", tmp) else jsonObject.put("phone", tmp)
      jsonObject
    })

    // 写入kafka
    saveRDD.foreachRDD(r => {
      if (r != null && !r.isEmpty()) {
        r.foreach(x => {
          val name = x.getString("name")
          try {
            KafkaUtilities.sendKafka(
              KafkaUtilities.getProducer(configProperties.value.getProperty("bootstrap.servers")), // KafkaProducer单例
              configProperties.value.getProperty("kafka.topic.save"),
              x.toJSONString,
              name,
              new scala.util.Random().nextInt(2),
              new Callback {
                override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
                  if (recordMetadata != null && e == null) {
                    //保存偏移量到 zookeeper
                    ZKUtils.getIns(configProperties.value).setData(
                      "/test/monitor/" + configProperties.value.getProperty("kafka.topic.input") + "/" +
                        configProperties.value.getProperty("group.id") + "/" +
                        x.getString("partition"),
                      x.getString("offset"))
                  } else {
                    println("异步发送消息失败", x.toJSONString)
                    e.printStackTrace()
                  }
                }
              }
            )
          }
        })
      }
    })

    // 开始运算
    ssc.start()
    // 等待被打断停止
    ssc.awaitTermination()
  }
}

查看kafka收到的消息

[root@cloudera01 ~]# kafka-console-consumer --bootstrap-server cloudera01:9092 --topic test_gp2
{"partition":"2","offset":"108","phone":"14400033426","name":"nick"}
{"partition":"2","offset":"106","name":"nick","addr":"shanghai#200000"}
{"partition":"2","offset":"107","name":"lulu","addr":"hangzhou#310000"}
{"partition":"2","offset":"109","phone":"18800074423","name":"lulu"}
{"partition":"1","offset":"107","name":"amy","addr":"beijing#100000"}
{"partition":"3","offset":"53","name":"bob","addr":"shanghai#200000"}
{"partition":"3","offset":"54","phone":"15700079421","name":"bob"}
{"partition":"1","offset":"106","name":"tom","addr":"beijing#100000"}
{"partition":"1","offset":"108","phone":"16700379451","name":"tom"}
{"partition":"1","offset":"109","phone":"18700079458","name":"amy"}
{"partition":"4","offset":"53","name":"alice","addr":"shanghai#200000"}
{"partition":"4","offset":"54","phone":"17730079427","name":"alice"}

查看zookeeper存储的offset信息

[zk: localhost:2181(CONNECTED) 30] ls /test/monitor/test_gp/kafka_operation_group
[1, 2, 3, 4]
[zk: localhost:2181(CONNECTED) 18] get /test/monitor/test_gp/kafka_operation_group/1
109
cZxid = 0x3d000877d5
ctime = Wed Sep 30 14:40:46 CST 2020
mZxid = 0x3d00087fb1
mtime = Wed Sep 30 15:32:18 CST 2020
pZxid = 0x3d000877d5
cversion = 0
dataVersion = 38
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0

相关文章

网友评论

    本文标题:Spark Streaming读写Kafka,将offset写入

    本文链接:https://www.haomeiwen.com/subject/eaxluktx.html