美文网首页Spark
Sparkstreaming数据零丢失之手动维护offset到M

Sparkstreaming数据零丢失之手动维护offset到M

作者: 喵星人ZC | 来源:发表于2019-06-06 23:15 被阅读0次

    版本信息:

    spark:2.2.0
    kakfa:0.10.1.0
    scala:2.11.8
    scalikejdbc:3.3.2
    

    Pom文件:

    <properties>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.2.0</spark.version>
            <scalikejdbc.version>3.3.2</scalikejdbc.version>
    </properties>
    
    <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
          <!--scalikejdbc 依赖 -->
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc_2.11</artifactId>
                <version>${scalikejdbc.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc-config_2.11</artifactId>
                <version>${scalikejdbc.version}</version>
            </dependency>
           <!--Spark 依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
             <!--mysql 依赖 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.6</version>
            </dependency>
    </dependencies>
    

    application.conf文件

    db.default.driver="com.mysql.jdbc.Driver"
    db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
    db.default.user="root"
    db.default.password="root"
    dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource
    
    
    #Kafka信息
    metadata.broker.list = "192.168.245.100:9092"
    #从老数据开始消费
    auto.offset.reset = "smallest"
    group.id = "baidu_offset_group"
    kafka.topics = "baidu"
    serializer.class = "kafka.serializer.StringEncoder"
    request.required.acks = "1"
    

    ValueUtils

    package com.soul.bigdata.spark.streaming01
    
    import com.typesafe.config.ConfigFactory
    import org.apache.commons.lang3.StringUtils
    
    object ValueUtils {
      val load = ConfigFactory.load()
    
      def getStringValue(key: String, defaultValue: String = "") = {
        val value = load.getString(key)
        if (StringUtils.isNotEmpty(value)) {
          value
        } else {
          defaultValue
        }
      }
    }
    

    MySQL Offset表

     create table baidu_offset(
            topic varchar(32),
            groupid varchar(50),
            partitions int,
            fromoffset bigint,
            untiloffset bigint,
            primary key(topic,groupid,partitions)
            );
    

    代码:

    package com.soul.bigdata.spark.streaming01
    
    
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scalikejdbc.{DB, SQL}
    import scalikejdbc.config.DBs
    
    object StreamingOffsetMySQL {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingOffsetMySQL")
    
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //Topic
        val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet
    
        //kafka参数
        //这里应用了自定义的ValueUtils工具类,来获取application.conf里的参数,方便后期修改
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> ValueUtils.getStringValue("metadata.broker.list"),
          "auto.offset.reset" -> ValueUtils.getStringValue("auto.offset.reset"),
          "group.id" -> ValueUtils.getStringValue("group.id")
        )
    
    
        //先使用scalikejdbc从MySQL数据库中读取offset信息
        //+------------+------------------+------------+------------+-------------+
        //| topic      | groupid          | partitions | fromoffset | untiloffset |
        //+------------+------------------+------------+------------+-------------+
        //MySQL表结构如上,将“topic”,“partitions”,“untiloffset”列读取出来
        //组成 fromOffsets: Map[TopicAndPartition, Long],后面createDirectStream用到
    
    
        DBs.setup()
        val fromOffset = DB.readOnly(implicit session => {
          SQL("select * from baidu_offset").map(rs => {
            (TopicAndPartition(rs.string("topic"), rs.int("partitions")), rs.long("untiloffset"))
          }).list().apply()
        }).toMap
    
    
        //如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费
        val messages = if (fromOffset.isEmpty) {
          println("从头开始消费...")
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        } else {
          println("从已存在记录开始消费...")
          val messageHandler = (mm: MessageAndMetadata[String, String]) => (mm.key(), mm.message())
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffset, messageHandler)
        }
    
    
        messages.foreachRDD(rdd => {
          if (!rdd.isEmpty()) {
            //输出rdd的数据量
            println("数据统计记录为:" + rdd.count())
            //官方案例给出的获得rdd offset信息的方法,offsetRanges是由一系列offsetRange组成的数组
            //          trait HasOffsetRanges {
            //            def offsetRanges: Array[OffsetRange]
            //          }
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            offsetRanges.foreach(x => {
              //输出每次消费的主题,分区,开始偏移量和结束偏移量
              println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---")
              //将最新的偏移量信息保存到MySQL表中
              DB.autoCommit(implicit session => {
                SQL("replace into baidu_offset(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)")
                  .bind(x.topic, ValueUtils.getStringValue("group.id"), x.partition, x.fromOffset, x.untilOffset)
                  .update().apply()
              })
            })
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    运行


    image.png

    停掉程序,重新运行,开始offset是从411开始消费的就达到了我们的目的


    image.png image.png image.png

    相关文章

      网友评论

        本文标题:Sparkstreaming数据零丢失之手动维护offset到M

        本文链接:https://www.haomeiwen.com/subject/wcypxctx.html