美文网首页
Spark基础系列之六--数据读写

Spark基础系列之六--数据读写

作者: 微生活_小阿楠 | 来源:发表于2020-05-06 07:39 被阅读0次

传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门

一、文件数据读写

1)本地文件系统的数据读写

//从文件中读取数据创建RDD
val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
 
//把RDD写入到文本文件中
textFile .saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

2)分布式文件系统的数据读写

//从分布式文件系统HDFS中读取数据
val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
val textFile = sc.textFile("/user/hadoop/word.txt")
val textFile = sc.textFile("word.txt")

//把RDD写入到HDFS中
textFile .saveAsTextFile("writeback")

3)JSON文件的数据读写

  • JSON(JavaScript Object Notation)是一种轻量级的数据交换格式
//把本地people.json文件加载到RDD中
val jsonStr = sc.textFile("file:///usr/local/spark/mycode/wordcount/people.json")
jsonStr.foreach(println)

二、读写Hbase数据

  • 1)创建一个Hbase表并导入数据
  • 2)配置Spark(把Hbase的lib目录下的jar文件拷贝到Spark中)
  • 3)编写程序读取Hbase数据(如果要让Spark读取Hbase,就需要使用SparkContext提供的newAPIHadoopRDD这个API将表的内容以RDD的形式加载到Spark中,就如下面代码import部分)
//读取Hbase数据
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkOperateHBase {
def main(args: Array[String]) {

    val conf = HBaseConfiguration.create()
    val sc = new SparkContext(new SparkConf().setAppName("SparkWriteHBase").setMaster("local"))
    //设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "student")
    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
    val count = stuRDD.count()
    println("Students RDD Count:" + count)
    stuRDD.cache()

    //遍历输出
    stuRDD.foreach({ case (_,result) =>
        val key = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
    })
}
}

//向hbase写入数据
import org.apache.hadoop.hbase.HBaseConfiguration  
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
import org.apache.spark._  
import org.apache.hadoop.mapreduce.Job  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import org.apache.hadoop.hbase.client.Result  
import org.apache.hadoop.hbase.client.Put  
import org.apache.hadoop.hbase.util.Bytes  

object SparkWriteHBase {  

  def main(args: Array[String]): Unit = {  
    val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")  
    val sc = new SparkContext(sparkConf)        
    val tablename = "student"        
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  

    val job = new Job(sc.hadoopConfiguration)  
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
    job.setOutputValueClass(classOf[Result])    
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    

    val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录
    val rdd = indataRDD.map(_.split(',')).map{arr=>{  
      val put = new Put(Bytes.toBytes(arr(0))) //行健的值 
      put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  //info:name列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))  //info:gender列的值
            put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))  //info:age列的值
      (new ImmutableBytesWritable, put)   
    }}        
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  }    
}  

相关文章

网友评论

      本文标题:Spark基础系列之六--数据读写

      本文链接:https://www.haomeiwen.com/subject/qevqghtx.html