美文网首页
orc文件格式对常用系统的支持

orc文件格式对常用系统的支持

作者: YG_9013 | 来源:发表于2018-10-24 10:33 被阅读0次

1、Hive支持

创建表时指定orc格式即可:

create table tmp.orc_test(id bigint, name string, age int) stored as orc TBLPROPERTIES('orc.compress'='SNAPPY')

压缩格式有"SNAPPY"和 "ZLIB"两种,需要哪种格式指定即可。

2、SPARK支持

Spark读:
df  = spark.read.orc("/tmp/test/orc_data")  # 读出来的数据是一个dataframe

Spark写:
df.write.format("orc").save("/tmp/test/orc_data2")

3、Hadoop Streaming支持

3.1、读orc文件,输出text

hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars /usr/local/hive-1.2.0/lib/hive-exec-1.2.0-SNAPSHOT.jar \
-mapper /bin/cat -reducer /bin/cat \
-input /tmp/test/orc_test1 \
-output /tmp/test/orc_streaming_test3 \
-inputformat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 

返回的数据:

null    {"name":"123","age":"456"}
null    {"name":"456","age":"789"}

3.2、读orc文件,写orc文件:

hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars orc_maprd_test.jar \
-D orc.mapred.output.schema="struct<id:string,name:string,sex:string,age:string>" \
-input /tmp/test/orc_streaming_test \
-output /tmp/test/orc_streaming_test2 \
-inputformat org.apache.orc.mapred.OrcInputFormat \
-outputformat org.apache.orc.mapred.OrcOutputFormat \
-mapper is.orc.MyMapper -reducer is.orc.MyReducer 

pom.xml

<dependencies>
  <dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.0</version>
  </dependency>
</dependencies>

mapper:

package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;
import java.util.Random;

class MyMapper implements Mapper<NullWritable,OrcStruct,LongWritable,Text> {

    Random random = new Random();

    public void close() { }

    public void map(NullWritable nullWritable, OrcStruct orcStruct, OutputCollector<LongWritable, Text> outputCollector, Reporter reporter) throws IOException {
        StringBuffer str = new StringBuffer();
        str.append(orcStruct.getFieldValue(0).toString() + "\t");
        str.append(orcStruct.getFieldValue(1).toString() + "\t");
        str.append(orcStruct.getFieldValue(2).toString() + "\t");
        str.append(orcStruct.getFieldValue(3).toString() );
        
        //不知道为什么Mapper的OutputKey只能用LongWritable,用随机数生成一个key,防止读orc文件后单reduce的情况
        LongWritable key = new LongWritable(random.nextInt(5)); 
        outputCollector.collect(key, new Text(str.toString()));
    }

    public void configure(JobConf jobConf) {
        jobConf.setMapOutputKeyClass(Writable.class);
        jobConf.setMapOutputValueClass(Text.class);
    }
}

Reducer:

package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;


class MyReducer implements Reducer<LongWritable, Text, NullWritable, OrcStruct> {
    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);


    public void reduce(LongWritable text, Iterator<Text> iterator, OutputCollector<NullWritable, OrcStruct> outputCollector, Reporter reporter) throws IOException {

        while (iterator.hasNext()) {
            String[] lineSplit = iterator.next().toString().split("\t");
            pair.setFieldValue("name",new Text(lineSplit[0]));
            pair.setFieldValue("sex",new Text(lineSplit[1]));
            pair.setFieldValue("age",new Text(lineSplit[2]));
            pair.setFieldValue("id",new Text(lineSplit[3]));
            break;
        }

        outputCollector.collect(NullWritable.get(),pair);
    }

    public void close() throws IOException {

    }

    public void configure(JobConf jobConf) {

    }
}

4、MapReduce支持

读orc的mapper:

package is.orc;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.mapred.OrcStruct;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class OrcFileReadMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {

    private Text outputKey = new Text();

    @Override
    protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
        StringBuffer sb= new StringBuffer();
        if (value.getFieldValue(0) == null){
            sb.append("-1\t");
        }else{
            sb.append(value.getFieldValue(0).toString() + "\t");      //通过下标索引获取数据
        }


        sb.append(value.getFieldValue(1).toString()+ "\t");
        sb.append(value.getFieldValue(2).toString()+ "\t");
        sb.append(value.getFieldValue(3).toString());      //也可以通过字段名获取数据

        outputKey = new Text(sb.toString());

        context.write(outputKey, NullWritable.get());
    }
}

写orc的reduer:

package is.orc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

public class OrcFileWriteReducer extends Reducer<Text,NullWritable,NullWritable,OrcStruct> {

    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        String[] lineSplit = line.trim().split("\t");

        pair.setFieldValue("id",new Text(lineSplit[0]));
        pair.setFieldValue("name",new Text(lineSplit[1]));
        pair.setFieldValue("sex",new Text(lineSplit[2]));
        pair.setFieldValue("age",new Text(lineSplit[3]));

        context.write(NullWritable.get(),pair);
    }
}

job配置:

package is.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;

/**
 * @author lyf
 * @since 2018/06/16
 */
public class OrcFileWriteJob extends Configured implements Tool {

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConf();

        conf.set("orc.mapred.output.schema","struct<id:string,name:string,sex:string,age:string>");
        String input = "/dws/dd_read_d_v2/dt=20180809/000000_0";
        String output = "/tmp/test/test_mr_orc";

        Job job = Job.getInstance(conf);

        job.setJarByClass(OrcFileWriteJob.class);
        job.setMapperClass(OrcFileReadMapper.class);
        job.setReducerClass(OrcFileWriteReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);

        job.setInputFormatClass(OrcInputFormat.class);
        job.setOutputFormatClass(OrcOutputFormat.class);

        FileInputFormat.addInputPath(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean rt = job.waitForCompletion(true);
        return rt?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf,new OrcFileWriteJob(),args);
    }
}


相关文章

  • orc文件格式对常用系统的支持

    1、Hive支持 创建表时指定orc格式即可: 压缩格式有"SNAPPY"和 "ZLIB"两种,需要哪种格式指定即...

  • ORC File

    ORC 文件是在hive 0.11.0开始支持。 ORC 文件格式 相对于其他的文件格式,ORC文件格式有以下优点...

  • parquet文件格式对常用系统的支持

    1、Hive支持 创建表时指定parquet格式即可: 压缩格式有"SNAPPY"和 "GZIP"两种,需要哪种格...

  • Hadoop_常用存储与压缩格式

    Hadoop_常用存储与压缩格式 HDFS文件格式 数据存储的方式 说明:orc是rcfile的一个优化版本 Ha...

  • python如何原生访问hdfs文件系统

    对于HIVE,生成orc或者parquet文件格式放在hdfs文件系统上,对外通过SQL语句,就能实现离线分析,数...

  • 数据读取与存储

    数据源类型 文件系统中的不同文件格式数据源:支持文件系统包括NFS,HDFS,Amazon S3,支持的文件格式包...

  • ORC原理及查询优化

    Hive从0.11版本开始提供了ORC的文件格式,ORC文件不仅仅是一种列式文件存储格式,最重要的是有着很高的压缩...

  • 压缩方式,存储方式

    公司常用orc存储方式和snappy压缩方式 orc存储文件默认zlib压缩,而snappy的压缩效率比zlib高...

  • 组件分享之后端组件——一个基于Golang的ORC组件包

    组件分享之后端组件——一个基于Golang的ORC组件包 背景 近期正在探索前端、后端、系统端各类常用组件与工具,...

  • Hive ORC

    ORC是RCfile的优化版本 关于Hive的文件格式 TEXTFILE 默认格式,建表时不指定默认为这个格式,...

网友评论

      本文标题:orc文件格式对常用系统的支持

      本文链接:https://www.haomeiwen.com/subject/uksttqtx.html