美文网首页SparkSt...
SparkStreaming-Kafka通过指定偏移量获取数据

SparkStreaming-Kafka通过指定偏移量获取数据

作者: spark打酱油 | 来源:发表于2022-11-27 16:51 被阅读0次

    SparkStreaming-Kafka通过指定偏移量获取数据

    1.数据源

    '310999003001', '3109990030010220140820141230292','00000000','','2017-08-20 14:09:35','0',255,'SN', 0.00,'4','','310999','310999003001','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '
    '310999003102', '3109990031020220140820141230266','粤BT96V3','','2017-08-20 14:09:35','0',21,'NS', 0.00,'2','','310999','310999003102','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '

    2.生产者

    
    import java.util.Properties
    
    import com.google.gson.{Gson, GsonBuilder}
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Date 2022/11/8 9:49
      */
    object KafkaEventProducer {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("KafkaEventProducer").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        val topic = "ly_test"
        val props = new Properties()
        props.put("bootstrap.servers","node01:9092,node02:9092,node03:9092")
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
        props.put("acks","all")
    //    props.put("security.protocol","SASL_PLAINTEXT")
    //    props.put("sasl.mechanism","PLAIN")
    //    props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';")
    
        val kafkaProducer = new KafkaProducer[String,String](props)
    
        val srcRDD: RDD[String] = sc.textFile("file:///F:\\work\\sun\\lywork\\hbaseoper\\datas\\kafkaproducerdata.txt")
        val records: Array[Array[String]] = srcRDD.filter(!_.startsWith(";")).map(_.split(",")).collect()
        //对数据进行预处理形成json形式
        for(record<-records){
          val trafficInfo = new TrafficInfo(record(0),record(2),record(4),record(6),record(13))
          // 不能用new Gson()   会出现 \u0027
          // val trafficInfoJson: String = new Gson().toJson(trafficInfo)
          //使用Gson gson = new Gson(),进行对象转化json格式时,单引号会被转换成u0027代码。使用以下方法进行替换
          val gson: Gson = new GsonBuilder().disableHtmlEscaping().create()
          val trafficInfoJson: String = gson.toJson(trafficInfo)
          kafkaProducer.send(new ProducerRecord[String,String](topic,trafficInfoJson))
          println("Message Sent:"+trafficInfoJson)
          Thread.sleep(2000)
        }
    
        sc.stop()
        kafkaProducer.flush()
        kafkaProducer.close()
    
    
      }
      //相机编号
      //车牌号
      //时间
      //速度
      //车道编号
      case class TrafficInfo(camer_id:String,car_id:String,event_time:String,car_speed:String,car_code:String)
    
    }
    
    

    3.消费者获取指定偏移量

    
    import java.text.SimpleDateFormat
    import java.util
    import java.util.Date
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.record.TimestampType
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Date 2022/11/5 16:38
      */
    
    /**
      * 通过偏移量获取数据
      */
    object AttainDataFromOffset {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setAppName("AttainDataFromOffset").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        val ssc = new StreamingContext(sc,Seconds(5))
    
        val kafkaParams: Map[String, Object] = Map[String, Object](
          "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "ly",
          "auto.offset.reset" -> "earliest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
          // kafka 带有账号密码sasl协议的认证
    //      "security.protocol" -> "SASL_PLAINTEXT",
    //      "sasl.mechanism" -> "PLAIN",
    //      "sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';"
    
        )
    
        val topics = Array("ly_test")
        val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String,String](topics, kafkaParams)
        )
    
    
        val res: DStream[(String, String, Int, Long)] = stream.map(recoed => {
          val key: String = recoed.key()
          val value: String = recoed.value()
          val partionId: Int = recoed.partition()
          val offset: Long = recoed.offset()
          (key, value, partionId, offset)
          //println(key+"\t"+value+"\t"+partionId+"\t"+offset)
        })
    
        // 指定偏移量
        val offsetRanges = Array(
          // topic, partition, inclusive starting offset, exclusive ending offset
          OffsetRange("lawyee_test", 0, 1L, 10L)
        )
    
        // 获取指定偏移量的数据
        import scala.collection.JavaConverters._
        val jkafkaParams: util.Map[String, Object] = kafkaParams.asJava
        val offsetRDD: RDD[ConsumerRecord[String, String]] = KafkaUtils.createRDD[String,String](
          sc,
          jkafkaParams,
          offsetRanges,
          LocationStrategies.PreferConsistent
        )
        val resRDD: RDD[(String, String, Int, Long,String,TimestampType)] = offsetRDD.map(recoed => {
          val key: String = recoed.key()
          val value: String = recoed.value()
          val partionId: Int = recoed.partition()
          val offset: Long = recoed.offset()
          var time: Long = recoed.timestamp()
          val timeStr = timeStampToDate(time)
          val timestampType: TimestampType = recoed.timestampType()
          (key, value, partionId, offset,timeStr,timestampType)
          //println(key+"\t"+value+"\t"+partionId+"\t"+offset)
        })
    
        resRDD.foreach(println(_))
    
        res.print()
    
    
        ssc.start()
    
        ssc.awaitTermination()
    
      }
    
      // 时间格式时间 转换为字符串时间
      def dateToString(date:Date): String ={
        val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val strDate: String = simpleDateFormat.format(date)
        strDate
      }
    
      // 字符串时间转换为时间格式时间
      def strToDate(str:String):Date = {
        val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val date: Date = simpleDateFormat.parse(str)
        date
      }
      // 时间戳转化为字符串时间
      def timeStampToDate(timeStamp:Long): String ={
        val date = new Date(timeStamp)
        val strDate: String = dateToString(date)
        strDate
      }
    
      //字符串时间转化为时间戳
      def dateToTimeStamp(strDate:String): Long ={
        val date: Date = strToDate(strDate)
        val timeStamp: Long = date.getTime
        timeStamp
      }
    
    }
    
    

    相关文章

      网友评论

        本文标题:SparkStreaming-Kafka通过指定偏移量获取数据

        本文链接:https://www.haomeiwen.com/subject/pmhkfdtx.html