美文网首页Scala学习大数据学习
Spark Streaming读写Kafka,将offset写入

Spark Streaming读写Kafka,将offset写入

作者: xiaogp | 来源:发表于2020-09-29 14:40 被阅读0次

    摘要: Spark StreamingKafkazookeeper

    Scala调用Kafka API发送数据

    使用Scala调用Kafka API向Kafka发送数据,设置topic,key/value的序列化方式,创建一个KafkaProducer对象和多个ProducerRecord不断调用send方法。

    import java.util.Properties
    
    import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
    import org.apache.kafka.common.serialization.StringSerializer
    
    import scala.util.Random
    
    object Producer {
      val topic = "test_gp"
      val brokers = "cloudera01:9092"
      val rnd = new Random()
      val prop = new Properties() {
        {
          put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
          put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
          put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        }
      }
    
      def main(args: Array[String]): Unit = {
        val producer = new KafkaProducer[String, String](prop)
        // 模拟数据
        val nameAddres = Map("bob" -> "shanghai#200000", "amy" -> "beijing#100000", "alice" -> "shanghai#200000", "tom" -> "beijing#100000", "lulu" -> "hangzhou#310000",
          "nick" -> "shanghai#200000")
        val namePhones = Map("bob" -> "15700079421", "amy" -> "18700079458", "alice" -> "17730079427", "tom" -> "16700379451", "lulu" -> "18800074423",
          "nick" -> "14400033426")
    
        for (nameAddre <- nameAddres) {
          val data = new ProducerRecord[String, String](topic, nameAddre._1, s"${nameAddre._1}\t${nameAddre._2}\t0")
          producer.send(data, new Callback {
            override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
              if (null != e) e.printStackTrace()
            }
          })
          if (rnd.nextInt(100) < 50) Thread.sleep(rnd.nextInt(10))
        }
    
        for (namePhone <- namePhones) {
          val data = new ProducerRecord[String, String](topic, namePhone._1, s"${namePhone._1}\t${namePhone._2}\t0")
          producer.send(data, new Callback {
            override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
              if (null != e) e.printStackTrace()
            }
          })
          if (rnd.nextInt(100) < 50) Thread.sleep(rnd.nextInt(10))
        }
    
        producer.close()
      }
    }
    
    

    使用kafka客户端查看接受到的数据
    test_gp有5个主题,2个备份,分布在3台机器上

    [root@cloudera01 ~]# kafka-topics --describe --zookeeper cloudera01:2181/kafka --topic test_gp
    Topic:test_gp   PartitionCount:5    ReplicationFactor:2 Configs:
        Topic: test_gp  Partition: 0    Leader: 79  Replicas: 79,78 Isr: 78,79
        Topic: test_gp  Partition: 1    Leader: 77  Replicas: 77,79 Isr: 77,79
        Topic: test_gp  Partition: 2    Leader: 78  Replicas: 78,77 Isr: 78,77
        Topic: test_gp  Partition: 3    Leader: 79  Replicas: 79,77 Isr: 77,79
        Topic: test_gp  Partition: 4    Leader: 77  Replicas: 77,78 Isr: 78,77
    [root@cloudera01 ~]# kafka-console-consumer --bootstrap-server cloudera01:9092 --topic test_gp
    nick    shanghai#200000 0
    lulu    hangzhou#310000 0
    bob shanghai#200000 0
    tom beijing#100000  0
    amy beijing#100000  0
    nick    14400033426 1
    lulu    18800074423 1
    bob 15700079421 1
    tom 16700379451 1
    amy 18700079458 1
    alice   shanghai#200000 0
    alice   17730079427 1
    

    Spark Streaming消费Kafka数据并写入Kafka

    从kafka中抽取数据进行消费,将原始数据解析成json格式写入一个新的kafka,test_gp2,通过客户端查看test_gp2有两个分区

    [root@cloudera01 ~]# kafka-topics --zookeeper cloudera01:2181/kafka --describe --topic test_gp2
    Topic:test_gp2  PartitionCount:2    ReplicationFactor:3 Configs:
        Topic: test_gp2 Partition: 0    Leader: 78  Replicas: 78,79,77  Isr: 78,77,79
        Topic: test_gp2 Partition: 1    Leader: 79  Replicas: 79,77,78  Isr: 78,77,79
    

    开发步骤:

    • SparkConf创建spark配置
    • StreamingContext创建spark streaming上下文
    • 创建Kafka配置参数kafkaParams: Map[String, Object]
    • 创建其他配置并且使用ssc上下文广播
    • 通过KafkaUtils.createDirectStream获取kafkaDirectStream DStream对象
    • map转换操作将String转化为JSONObject
    • foreachRDD输出,rdd调用foreach输出每条消息到kafka,在fearch内部调用KafkaProducer单例对象,如果KafkaProducer异步发送消息成功将offset写入zookeeper
      先创建一个KafkaUtilities脚本构建KafkaProducer单例对象,否则每个partition中每条记录都需要新建一个KafkaProducer增加了不必要的开销。
    import java.util.Properties
    import org.apache.kafka.clients.producer.{ Producer, ProducerRecord, KafkaProducer }
    import org.apache.kafka.clients.producer.Callback
    
    object KafkaUtilities {
      private var producer: Producer[String, String] = null
    
      def getProducer(bootstrap: String): Producer[String, String] = {
        this.synchronized {
          if (producer == null) {
            producer = createKafkaProducer(bootstrap)
          }
          producer
        }
      }
    
      def createKafkaProducer(bootstrap: String): Producer[String, String] = {
        val props = new Properties()
        props.put("max.request.size", "50000000")
        props.put("buffer.memory", "50000000")
        props.put("timeout.ms", "3000000")
        props.put("request.timeout.ms", "30000000")
        props.put("socket.request.max.bytes", "60000000")
        props.put("message.max.bytes", "60000000")
        props.put("replica.fetch.max.bytes", "100000000")
        props.put("fetch.message.max.bytes", "100000000")
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("request.required.acks", "1")
        props.put("max.in.flight.requests.per.connection", "1")
        props.put("retries", "3")
        props.put("bootstrap.servers", bootstrap)
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("max.request.size", "50000000")
        props.put("buffer.memory", "50000000")
        props.put("timeout.ms", "3000000")
        props.put("request.timeout.ms", "30000000")
        props.put("socket.request.max.bytes", "60000000")
        props.put("message.max.bytes", "60000000")
        props.put("replica.fetch.max.bytes", "100000000")
        props.put("fetch.message.max.bytes", "100000000")
    
        new KafkaProducer[String, String](props)
      }
    
      /**
       * 异步发送消息到kafka
       * @param kafkaProducer
       * @param topic
       * @param message
       * @param key
       * @param partition
       * @param callback
       * @return
       */
      def sendKafka(
             kafkaProducer: Producer[String, String],
             topic:         String,
             message:       String,
             key:           String,
             partition:     Integer,
             callback:      Callback) = {
        if (callback != null && callback.isInstanceOf[Callback]) {
          kafkaProducer.send(new ProducerRecord[String, String](topic, partition, key, message), callback)
        } else {
          kafkaProducer.send(new ProducerRecord[String, String](topic, partition, key, message))
        }
      }
    }
    

    在send方法中手动指定了keypartition,如果一个有效的partition属性数值被指定,那么在发送记录时partition属性数值就会被应用发到指定的分区编号。如果没有partition属性数值被指定,而一个key属性被声明的话,一个partition会通过key的hash而被选中发送到对应的partition。如果既没有key也没有partition属性数值被声明,那么一个partition将会被分配以轮询的方式


    创建工厂模式zookeeper方法,实现往zookeeper写入数据

    
    import org.apache.zookeeper._
    import org.apache.zookeeper.common._
    import java.util.Properties
    
    import org.apache.zookeeper.data.Stat
    
    class ZKUtils() extends Watcher {
      private var zk: ZooKeeper = _
      private var pro: Properties = _
    
      private def this(pro: Properties) = {
        this()
        this.pro = pro
        this.zk = new ZooKeeper(pro.getProperty("ZK_HOST"), 20000, this)
      }
    
      @Override
      def process(event: WatchedEvent) {
        //no watching
      }
    
      private def createPath(path: String, zk: ZooKeeper): Unit = {
        //创建路径
        PathUtils.validatePath(path)
        var parentPath = path.substring(0, path.indexOf("/", 0) + 1)
        while (parentPath != null && !parentPath.isEmpty) {
          if (zk.exists(parentPath, false) == null) {
            // 同步创建,acl权限完全开放,节点类型持久化节点
            zk.create(parentPath, "".getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
          }
    
          // 如果根路径和全路径一致停止创建
          if (parentPath.equals(path)) {
            parentPath = null
          } else {
            // 寻找下一个/作为新根路径
            var idx = path.indexOf("/", parentPath.length() + 1)
            if (-1 == idx) {
              idx = path.length
            }
            parentPath = path.substring(0, idx)
          }
        }
      }
    
      def setData(path: String, data: String): Stat = {
        if (zk.exists(path, false) == null) {
          createPath(path, zk)
        }
        zk.setData(path, data.getBytes, -1)
      }
    
      def getData(path: String): String = {
        val stat = zk.exists(path, false)
        if (stat == null) {
          null
        } else {
          new String(zk.getData(path, false, stat))
        }
      }
    }
    
    
    object ZKUtils {
      private var ZKUtils: ZKUtils = _
    
      def getIns(prop: Properties): ZKUtils = {
        this.synchronized {
          if (ZKUtils == null) {
            ZKUtils = new ZKUtils(prop)
          }
          ZKUtils
        }
      }
    }
    

    消费test_gp消息,每条消息调用KafkaProducer单例对象写入test_gp2

    import java.util.Properties
    
    import org.apache.log4j.{Level, Logger}
    import com.alibaba.fastjson.JSONObject
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.clients.producer.{Callback, RecordMetadata}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    object KafkaOperation {
      Logger.getRootLogger.setLevel(Level.WARN)
    
      def main(args: Array[String]): Unit = {
        // spark配置
        val sparkConf = new SparkConf()
          .setAppName("KafkaOperation")
          .setMaster("local[*]")
          .set("spark.streaming.kafka.maxRatePerPartition", "100") // 每秒对一个topic下每个partition拉取的最大数据量
        // 这个参数需要和batchDuration一起调节做到拉取和处理数据平衡
        // 默认没有上限,在一个batchDuration下kafka有多少数据就拉取多少数据,由于batchDuration时间不能动态调节,所以当数据流入量大了可能会堵塞
    
        // 创建上下文
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        // 其他配置广播
        val properties: Properties = new Properties()
        properties.setProperty("bootstrap.servers", "cloudera01:9092")
        properties.setProperty("kafka.topic.save", "test_gp2")
        properties.setProperty("kafka.topic.input", "test_gp")
        properties.setProperty("group.id", "kafka_operation_group")
        properties.setProperty("ZK_HOST", "cloudera01,cloudera02,cloudera03")
        val configProperties = ssc.sparkContext.broadcast(properties)
    
        val kafkaParams = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "cloudera01:9092",
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.GROUP_ID_CONFIG -> "kafka_operation_group",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
          ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
        )
    
        // 获取kafka DStream
        val kafkaDirectStream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent, // 在executor上均匀分布分区
          Subscribe[String, String](List("test_gp"), kafkaParams)) // 订阅主题集合,kafka参数
    
        val saveRDD = kafkaDirectStream.map(x => {
          // 添加partition和offset到数据
          val jsonObject = new JSONObject() {
            {
              put("partition", x.partition().toString)
              put("offset", x.partition().toString)
            }
          }
          val Array(name, tmp, label) = x.value().split("\t")
          jsonObject.put("name", name)
          if (label.toInt == 0) jsonObject.put("addr", tmp) else jsonObject.put("phone", tmp)
          jsonObject
        })
    
        // 写入kafka
        saveRDD.foreachRDD(r => {
          if (r != null && !r.isEmpty()) {
            r.foreach(x => {
              val name = x.getString("name")
              try {
                KafkaUtilities.sendKafka(
                  KafkaUtilities.getProducer(configProperties.value.getProperty("bootstrap.servers")), // KafkaProducer单例
                  configProperties.value.getProperty("kafka.topic.save"),
                  x.toJSONString,
                  name,
                  new scala.util.Random().nextInt(2),
                  new Callback {
                    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
                      if (recordMetadata != null && e == null) {
                        //保存偏移量到 zookeeper
                        ZKUtils.getIns(configProperties.value).setData(
                          "/test/monitor/" + configProperties.value.getProperty("kafka.topic.input") + "/" +
                            configProperties.value.getProperty("group.id") + "/" +
                            x.getString("partition"),
                          x.getString("offset"))
                      } else {
                        println("异步发送消息失败", x.toJSONString)
                        e.printStackTrace()
                      }
                    }
                  }
                )
              }
            })
          }
        })
    
        // 开始运算
        ssc.start()
        // 等待被打断停止
        ssc.awaitTermination()
      }
    }
    
    

    查看kafka收到的消息

    [root@cloudera01 ~]# kafka-console-consumer --bootstrap-server cloudera01:9092 --topic test_gp2
    {"partition":"2","offset":"108","phone":"14400033426","name":"nick"}
    {"partition":"2","offset":"106","name":"nick","addr":"shanghai#200000"}
    {"partition":"2","offset":"107","name":"lulu","addr":"hangzhou#310000"}
    {"partition":"2","offset":"109","phone":"18800074423","name":"lulu"}
    {"partition":"1","offset":"107","name":"amy","addr":"beijing#100000"}
    {"partition":"3","offset":"53","name":"bob","addr":"shanghai#200000"}
    {"partition":"3","offset":"54","phone":"15700079421","name":"bob"}
    {"partition":"1","offset":"106","name":"tom","addr":"beijing#100000"}
    {"partition":"1","offset":"108","phone":"16700379451","name":"tom"}
    {"partition":"1","offset":"109","phone":"18700079458","name":"amy"}
    {"partition":"4","offset":"53","name":"alice","addr":"shanghai#200000"}
    {"partition":"4","offset":"54","phone":"17730079427","name":"alice"}
    

    查看zookeeper存储的offset信息

    [zk: localhost:2181(CONNECTED) 30] ls /test/monitor/test_gp/kafka_operation_group
    [1, 2, 3, 4]
    [zk: localhost:2181(CONNECTED) 18] get /test/monitor/test_gp/kafka_operation_group/1
    109
    cZxid = 0x3d000877d5
    ctime = Wed Sep 30 14:40:46 CST 2020
    mZxid = 0x3d00087fb1
    mtime = Wed Sep 30 15:32:18 CST 2020
    pZxid = 0x3d000877d5
    cversion = 0
    dataVersion = 38
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    

    相关文章

      网友评论

        本文标题:Spark Streaming读写Kafka,将offset写入

        本文链接:https://www.haomeiwen.com/subject/eaxluktx.html