美文网首页hive
spark读写mysql、hive、kafka数据demo

spark读写mysql、hive、kafka数据demo

作者: bigdata_er | 来源:发表于2018-11-06 15:52 被阅读29次

    读取hive库数据

    pom.xml依赖配置

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    

    读取hive数据demo

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    object Main {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                  .setMaster("spark://master:7077")//申明spark运行模式
                  .setAppName("risk")//设置job名称(可不写)
        val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
                    .config(conf)
                    .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
                    .getOrCreate()
        import spark.implicits._
        spark.sql("use bmkp")
        val df= spark.sql("select * from customer")//在hive中执行sql语句,返回DataSet格式数据
        df.show()
        spark.stop()
      }
    }
    

    读取mysql数据

    pom.xml配置文件

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    

    读取mysql数据demo

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    object Main {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                      .setMaster("spark://master:7077")//申明spark运行模式
                      .setAppName("risk")//设置job名称(可不写)
        val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
                     .config(conf)
                     .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
                     .getOrCreate()
    //读取mysql中数据,返回数据类型为DataSet
    val df = spark.read
            .format("jdbc")
            .options(Map("url" ->   
    //配置mysql连接参数,包括mysql ip 端口  数据库名称 登录名和密码
    "jdbc:mysql://***.***.***.***:3036/bmkpstress?user=root&password=**********",
    //定义驱动程序
             "driver"->"com.mysql.jdbc.Driver",
    //编写sql  在mysql中执行该sql并返回数据
             "dbtable" -> "(select * from test group by id) as aaa"))
              .load()
        spark.stop()
      }
    }
    

    SPARKSTREAMING读取kafka数据

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    

    读取kafka数据demo

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object Main {
      def main(args:Array[String]):Unit={
        val conf = new SparkConf().setMaster("spark://master:7077")
          .setAppName("kafka_hive");
        val spark = SparkSession.builder().master("spark://master:7077").config(conf).enableHiveSupport().getOrCreate()
        var ssc = new StreamingContext(conf, Seconds(10));
        var topics = Array("service_cksc","service_ckxc","service_dcyy");//kafka  topic名称
        var group = "bmkp" //定义groupID
        val kafkaParam = Map(   //申明kafka相关配置参数
          "bootstrap.servers" -> "***.104.42.127:19092,***.104.202.222:19092,***.135.73.152:19092", //kafka 集群IP及端口
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> group, //定义groupID
          "auto.offset.reset" -> "earliest",//设置丢数据模式  有 earliest,latest, none
          "enable.auto.commit" -> (false: java.lang.Boolean)//设置是否自动存储offset 这里设置为否
        );
        val offsetRanges = Array()
        var stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam))//从kafka读取数据 获取数据流
        stream.foreachRDD { rdd =>
          import spark.implicits._
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取offset
          /*
          这里处理从kafka获取的数据,在确定获取的数据已经存储或者处理后将该RDD的offset存储
           */
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //存储offset
        }
      }
    }
    

    SPARK写数据到HIVE

    pom.xml配置信息

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
    </dependency
    

    写数据到hive库demo

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object Main {
      case class Person(name:String,col1:Int,col2:String)
      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
          .setMaster("spark://master:7077")//申明spark运行模式
          .setAppName("kettle")//设置job名称(可不写)
        val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
          .config(conf)
          .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
          .getOrCreate()
        import spark.implicits._ //引入隐式转换 否则RDD无法转换成DataSet(DataFrame)
        spark.sql("use DataBaseName"//在hive中执行sql语句
        val data = spark.read.textFile("path")//读取hdfs中的文件,返回的是RDD格式数据,RDD格式数据不能直接写入hive,(这里代表任意的RDD类型数据)
          .map(x=>x.split(","))
          .map(x=>Person(x(0),x(1).toInt,x(2)))//利用用例类将RDD格式居转换成DataSetG格式数据,从而可以写入hive中
        data.toDF().createOrReplaceTempView("table1") //将DataSet格式数据映射到临时表中
        spark.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")//在hive上运行sql语句将临时表中数据抽出并存入hive中
        spark.close()
      }
    }
    

    写数据到mysql

    pom.xml配置

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    

    spark 写数据到mysql库demo 1

      import java.sql.{Connection, DriverManager, PreparedStatement}
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.SparkSession
    object Main {
      case class Blog(name: String, count: Int)
      def main(args: Array[String]) {
        val conf = new SparkConf()
          .setMaster("spark://master:7077")//申明spark运行模式
          .setAppName("kettle")//设置job名称(可不写)
        val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
          .config(conf)
          .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
          .getOrCreate()
        //获取RDD数据  这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
        val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
        var conn: Connection = null//定义mysql连接
        var ps: PreparedStatement = null
        val sql = "insert into blog(name, count) values (?, ?)"//需要执行的sql语句,两个 “?”代表后面需要替换的数据
        data.foreachPartition(rdd=>
          try {
            //具体定义mysql的驱动管理器,主要设置mysql地址   端口  数据库  用户名  密码
            conn = DriverManager.getConnection("jdbc:mysql://***.***.***.***:3306/test","root", "******")
            rdd.toIterator.foreach(data => {
              ps = conn.prepareStatement(sql)
              ps.setString(1, data._1)//将需要写入mysql的数据进行映射
              ps.setInt(2, data._2)
              ps.executeUpdate()//在mysql上执行sql语句将数据插入到相应的表中
            })
          } catch {
            case e: Exception => println("Mysql Exception")
          } finally {
            if (ps != null) {
              ps.close()
            }
            if (conn != null) {
              conn.close()//关闭mysql连接
            }
          })
      }
    }
    

    写数据到mysql库demo 2

      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object Main {
      case class Blog(name: String, count: Int)
      def main(args: Array[String]) {
        val conf = new SparkConf()
          .setMaster("spark://master:7077") //申明spark运行模式
          .setAppName("kettle")
        //设置job名称(可不写)
        val spark = SparkSession.builder() //spark-2.0采用SparkSession代替sparkContext
          .config(conf)
          .enableHiveSupport() //添加对HIVE的支持,否则无法访问hive库
          .getOrCreate()
        //获取RDD数据  这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
        val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
        import spark.implicits._
        val df = data.map(x=>new Blog(x._1,x._2)).toDF()//将RDD类型数据转换成DataSet类型
        df.write.mode(SaveMode.Append).format("jdbc")
          .option("url", "jdbc:mysql://***.***.***.***:3306/test")//定义mysql 地址 端口 数据库
          .option("dbtable", "blog")//定义需要插入的mysql目标表
          .option("user", "****")//定义登录用户名
          .option("password", "************")//定义登录密码
          .save()//保存数据
      }
    }
    
    

    相关文章

      网友评论

        本文标题:spark读写mysql、hive、kafka数据demo

        本文链接:https://www.haomeiwen.com/subject/dtuuxqtx.html