美文网首页
SparkCore之文件系统类数据读取与保存

SparkCore之文件系统类数据读取与保存

作者: 大数据小同学 | 来源:发表于2020-08-11 08:07 被阅读0次

    HDFS

    Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.

    1. 输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
    2. 键类型: 指定[K,V]键值对中K的类型
    3. 值类型: 指定[K,V]键值对中V的类型
    4. 分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits
      注意:其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值。
    5. 在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
    6. 如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了

    MySQL数据库连接

    支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:

    1. 添加依赖
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>
    
    1. Mysql读取:
    import java.sql.DriverManager
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{SparkConf, SparkContext}
    object MysqlRDD {
     def main(args: Array[String]): Unit = {
       //1.创建spark配置信息
       val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
       //2.创建SparkContext
       val sc = new SparkContext(sparkConf)
       //3.定义连接mysql的参数
       val driver = "com.mysql.jdbc.Driver"
       val url = "jdbc:mysql://hadoop102:3306/rdd"
       val userName = "root"
       val passWd = "000000"
       //创建JdbcRDD
       val rdd = new JdbcRDD(sc, () => {
         Class.forName(driver)
         DriverManager.getConnection(url, userName, passWd)
       },
         "select * from `rddtable` where `id`>=?;",
         1,
         10,
         1,
         r => (r.getInt(1), r.getString(2))
       )
       //打印最后结果
       println(rdd.count())
       rdd.foreach(println)
       sc.stop()
     }
    }
    

    Mysql写入:

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
      val data = sc.parallelize(List("Female", "Male","Female"))
      data.foreachPartition(insertData)
    }
    def insertData(iterator: Iterator[String]): Unit = {
    Class.forName ("com.mysql.jdbc.Driver").newInstance()
      val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
      iterator.foreach(data => {
        val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
        ps.setString(1, data) 
        ps.executeUpdate()
      })
    }
    

    HBase数据库

    由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.
    Result。

    1. 添加依赖
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
    </dependency>
    
    1. 从HBase读取数据
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.hadoop.hbase.util.Bytes
    object HBaseSpark {
      def main(args: Array[String]): Unit = {
        //创建spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
        //创建SparkContext
        val sc = new SparkContext(sparkConf)
        //构建HBase配置信息
        val conf: Configuration = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
        conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
        //从HBase读取数据形成RDD
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
          conf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result])
        val count: Long = hbaseRDD.count()
        println(count)
        //对hbaseRDD进行处理
        hbaseRDD.foreach {
          case (_, result) =>
            val key: String = Bytes.toString(result.getRow)
            val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
            val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))
            println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
        }
        //关闭连接
        sc.stop()
      }
    }
    
    1. 往HBase写入
    def main(args: Array[String]) {
    //获取Spark配置信息并创建与spark的连接
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
    //创建HBaseConf
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
    //构建Hbase表描述器
      val fruitTable = TableName.valueOf("fruit_spark")
      val tableDescr = new HTableDescriptor(fruitTable)
      tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
    //创建Hbase表
      val admin = new HBaseAdmin(conf)
      if (admin.tableExists(fruitTable)) {
        admin.disableTable(fruitTable)
        admin.deleteTable(fruitTable)
      }
      admin.createTable(tableDescr)
    //定义往Hbase插入数据的方法
      def convert(triple: (Int, String, Int)) = {
        val put = new Put(Bytes.toBytes(triple._1))
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
        (new ImmutableBytesWritable, put)
      }
    //创建一个RDD
      val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
    //将RDD内容写到HBase
      val localData = initialRDD.map(convert)
      localData.saveAsHadoopDataset(jobConf)
    }
    
    关注微信公众号
    简书:https://www.jianshu.com/u/0278602aea1d
    CSDN:https://blog.csdn.net/u012387141

    相关文章

      网友评论

          本文标题:SparkCore之文件系统类数据读取与保存

          本文链接:https://www.haomeiwen.com/subject/rywjyhtx.html