美文网首页
Sequence File

Sequence File

作者: 436048bfc6a1 | 来源:发表于2019-03-10 08:20 被阅读0次
    1. 代码中使用Sequence File
    val file = sc.sequenceFile[BytesWritable, String]("hdfs://hadoop:12345/user/hive/warehouse/spark_seq/000000_0")
    file.map(x => (x._1.copyBytes(), x._2)).foreach(println)
    

    1.1 代码输出

    ([B@291d5e78,AT Austria)
    ([B@52a6f1dc,BE Belgium)
    ([B@688e2655,BY Belarus)
    ([B@578acd05,EE Estonia)
    ([B@1427b712,FR France)
    ([B@716a8c5b,DE Germany)
    ([B@68cfb0ad,GB United Kingdom)
    ([B@692cba54,US United States)
    

    1.2 上述代码存在的问题

    key是其hashcode值
    工作中无法使用, 所以不使用该方式
    

    1.3 对以上代码的改进

    对之前代码的x_2根据\t进行拆分
    

    1.4 注意
    (1)

    val file = sc.textFile("hdfs://hadoop:9000/user/hive/warehouse/spark_seq/000000_0")
    
    

    输出结果为

    BY Belarus
    EE  Estonia FR  France
    DE  GermanyGB   United KingdomUS    United States
    SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.TextjoK]g
    AT Austria
    BE  Belgium
    

    使用textFile函数对其进行读取,无法对sequencefile进行正确的读取,因为不同的format底层存储是不一样的

    (2) 由于sequence file是key-value形式的,有一些head信息,所以建议是[byteWritable: String]形式

    byteWritable
      A byte sequence that is usable as a key or value
      (一个byte sequence,以key或者value形式使用)
    
    1. Sequence file源码与解析

    2.1 源码

    def sequenceFile[K, V]
            //(参数是指定的路径, 最小Partitions)
            //输入数据文件的路径
           (path: String, minPartitions: Int = defaultMinPartitions)
           (implicit km: ClassTag[K], vm: ClassTag[V],
            kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
        withScope {
          assertNotStopped()
          val kc = clean(kcf)()
          val vc = clean(vcf)()
          val format = classOf[SequenceFileInputFormat[Writable, Writable]]
          val writables = hadoopFile(path, format,
            kc.writableClass(km).asInstanceOf[Class[Writable]],
            vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
          writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
        }
    }
    

    2.2 获得hadoop的sequence file的函数的实现

    //Get an RDD for a Hadoop SequenceFile with given key and value types
    def sequenceFile[K, V](path: String,
          keyClass: Class[K],
          valueClass: Class[V],
          minPartitions: Int
          ): RDD[(K, V)] = withScope {
        assertNotStopped()
        val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
        hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
    }
    

    源代码解释

    (1) 底层调用hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)

    (2) 只要有inputFormat结果集就都能读取出来

    因为val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
    其调用SequenceFileInputFormat类
    SequenceFileInputFormat类继承FileInputFormat
    FileInputFormat实现了InputFormat
    
    1. 如何创建sequenceFile

    3.1 在hive中创建表

    create table spark_seq_raw(code string, name string) 
    row format delimited fields terminated by '\t' ;
    
    load data local inpath '/home/hadoop/data/seq.txt' 
    overwrite into spark_seq_raw;
    

    3.2 存储成sequenceFile

    create table spark_seq(code string, name string) 
    row format delimited fields terminated by '\t' stored as sequencefile;
    
    insert into table spark_seq select * from spark_seq_raw;
    

    3.3 查看sequencefile

    hadoop fs -text /user/hive/warehouse/spark_seq/*
    

    相关文章

      网友评论

          本文标题:Sequence File

          本文链接:https://www.haomeiwen.com/subject/kfoypqtx.html