美文网首页
【2019-05-29】关于pig

【2019-05-29】关于pig

作者: BigBigFlower | 来源:发表于2019-05-29 20:41 被阅读0次

    pig包括两部分:
    (1)用于描述数据流的语言,Pig Latin
    (2)用户执行Pig Latin程序的执行环境。
    执行类型:

    本地模式(小规模数据集)
    pig访问本地文件系统
    pig -x local启动grunt。grunt是与pig进行交互的shell环境。

    MapReduce模式
    在map reduce模式下,pig将查询翻译为mapreduce作业,然后在Hadoop集群上运行。

    -- 加载数据A和B,通过A的第0列和B的第一列内连接
    A = LOAD 'input/pig/join/A';
    B = LOAD 'input/pig/join/B';
    C = JOIN A BY $0, /* ignored */ B BY $1;
    DUMP C;
    
    -- max_temp_filter_udf.pig
    REGISTER pig-examples.jar;
    -- 加载函数
    DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
    records = LOAD 'input/ncdc/micro-tab/sample.txt'
      AS (year:chararray, temperature:int, quality:int);
    -- 数据过滤
    filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
    -- group
    grouped_records = GROUP filtered_records BY year;
    -- 返回最大温度
    max_temp = FOREACH grouped_records GENERATE group,
      MAX(filtered_records.temperature);
    DUMP max_temp;
    
    
    -- max_temp_filter_stream.pig
    DEFINE is_good_quality `is_good_quality.py`
      SHIP ('ch16-pig/src/main/python/is_good_quality.py');
    records = LOAD 'input/ncdc/micro-tab/sample.txt'
      AS (year:chararray, temperature:int, quality:int);
    filtered_records = STREAM records THROUGH is_good_quality
      AS (year:chararray, temperature:int);
    grouped_records = GROUP filtered_records BY year;
    max_temp = FOREACH grouped_records GENERATE group,
      MAX(filtered_records.temperature);
    DUMP max_temp;
    
    关系操作 诊断操作 宏和udf
    命令
    表达式
    数据类型

    函数
    计算函数
    过滤函数
    加载函数
    存储函数


    部分内置函数
    部分内置函数(续)
     -- 对关系进行分组并在每一组内查找最大值,定义成宏
    DEFINE max_by_group(X, group_key, max_field) RETURNS Y {
      A = GROUP $X by $group_key;
      $Y = FOREACH A GENERATE group, MAX($X.$max_field);
    };
    -- 使用宏
    records = LOAD 'input/ncdc/micro-tab/sample.txt'
      AS (year:chararray, temperature:int, quality:int);
    filtered_records = FILTER records BY temperature != 9999 AND
      quality IN (0, 1, 4, 5, 9);
    max_temp = max_by_group(filtered_records, year, temperature);
    DUMP max_temp
    

    用户自定义函数

    删除不符合质量要求的气温读数记录
    //cc IsGoodQuality A FilterFunc UDF to remove records with unsatisfactory temperature quality readings
    // == IsGoodQualityTyped
    //vv IsGoodQuality
    package com.hadoopbook.pig;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.pig.FilterFunc;
    //^^ IsGoodQuality
    import org.apache.pig.FuncSpec;
    //vv IsGoodQuality  
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.data.DataType;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.impl.logicalLayer.FrontendException;
    //^^ IsGoodQuality
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    //vv IsGoodQuality
    
    public class IsGoodQuality extends FilterFunc {
    
      @Override
      public Boolean exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() == 0) {
          return false;
        }
        try {
          Object object = tuple.get(0);
          if (object == null) {
            return false;
          }
          int i = (Integer) object;
          return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
        } catch (ExecException e) {
          throw new IOException(e);
        }
      }
    //^^ IsGoodQuality
    //vv IsGoodQualityTyped
      @Override
      public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
        List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>();
        funcSpecs.add(new FuncSpec(this.getClass().getName(),
            new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
    
        return funcSpecs;
      }
    //^^ IsGoodQualityTyped  
    //vv IsGoodQuality  
    }
    // ^^ IsGoodQuality
    
    //从chararray值中去除开头和结尾的空白符
    package com.hadoopbook.pig;
    
    import org.apache.pig.PrimitiveEvalFunc;
    
    //cc Trim An EvalFunc UDF to trim leading and trailing whitespace from chararray values
    //vv Trim
    public class Trim extends PrimitiveEvalFunc<String, String> {
      @Override
      public String exec(String input) {
        return input.trim();
      }
    }
    // ^^ Trim
    
    //加载UDF
    //以列区域作为字段加载元组
    //cc CutLoadFunc A LoadFunc UDF to load tuple fields as column ranges
    package com.hadoopbook.pig;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.commons.logging.LogFactory;
    import org.apache.commons.logging.Log;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.pig.LoadFunc;
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
    import org.apache.pig.data.DataByteArray;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.data.TupleFactory;
    
    // vv CutLoadFunc
    public class CutLoadFunc extends LoadFunc {
    
      private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);
    
      private final List<Range> ranges;
      private final TupleFactory tupleFactory = TupleFactory.getInstance();
      private RecordReader reader;
    
      public CutLoadFunc(String cutPattern) {
        ranges = Range.parse(cutPattern);
      }
      
      @Override
    //setlocation把输入位置传递给加载器
      public void setLocation(String location, Job job)
          throws IOException {
        FileInputFormat.setInputPaths(job, location);
      }
      
      @Override
    //    InputFormat把输入切分成行,getInputFormat()方法为每一个分片新建一个RecodeReader。
      public InputFormat getInputFormat() {
        return new TextInputFormat();
      }
      // pig 把每个
      @Override
      public void prepareToRead(RecordReader reader, PigSplit split) {
        this.reader = reader;
      }
    
      @Override
      public Tuple getNext() throws IOException {
        try {
          if (!reader.nextKeyValue()) {
            return null;
          }
          Text value = (Text) reader.getCurrentValue();
          String line = value.toString();
          Tuple tuple = tupleFactory.newTuple(ranges.size());
          for (int i = 0; i < ranges.size(); i++) {
            Range range = ranges.get(i);
            if (range.getEnd() > line.length()) {
              LOG.warn(String.format(
                  "Range end (%s) is longer than line length (%s)",
                  range.getEnd(), line.length()));
              continue;
            }
            tuple.set(i, new DataByteArray(range.getSubstring(line)));
          }
          return tuple;
        } catch (InterruptedException e) {
          throw new ExecException(e);
        }
      }
    }
    // ^^ CutLoadFunc
    
    

    数据处理操作
    数据加载和存储

    STORE A INTO 'out' USING  PigStorage(':');
    

    数据的过滤

    -- 查询整张表
    DUMP A;
    -- 逐步处理一个关系中的行
    FOREACH…GRNERATE…
    -- STREAM可以使用内置命令作为参数,ex. unix cut命令从A中每个元组抽取第二个字段
    c = STREAM A THOUGH `cut -f 2`
    

    数据的分组与连接

    -- JOIN 内连接
    DUMP A;
    DUMP B;
    C = JOIN A BY $0,B BY $1;
    DUMP C;
    -- 如果进行连接的关系太大,不能全部放在内存中,则应该使用通用的连接操作。如果有一个关系到能全部放到内存中,则可以使用一种特殊的连接操作,即:分段复制操作,把小的输入关系发送到所有的mapper,并在map端使用内存查找表对(分段的)较大的关系进行连接。
    C = JOIN A BY $0,B BY $1 USING "replicated"
    -- COGROUP语句和JOIN类似,但它会创建一组嵌套的输出元组集合。默认外连接。
    D = COGROUP A BY $0,B BY $1;
    -- D同E
    E = COGROUP A BY $0 OUTER ,B BY $1 OUTER;
    -- 使用内连接
    F = COGROUP A BY $0 INNER ,B BY $1 INNER;
    
    原数据
    输出的D
    -- CROSS 笛卡尔积
    I = CROSS A,B;
    -- COGROUP用于把两个或多个关系中的数据放到一起。而 GROUP语句则对一个关系中的数据进行分组。
    
    -- 根据第二个字段的字符个数进行分组
    B = GROUP A BY SIZE($1);
    --   ALL 把一个关系的所有元组放入一个包
    C = GROUP A ALL;
    -- ANY 用于对关系中的元组随机分组。它对于取样非常有用。
    
    GROUP A ALL

    数据的排序

    -- 对第一个字段升序和第二个字段降序进行排序
    B = ORDER A BY $0,$1 DESC;
    

    数据的组合和切分

    -- 并
    C = UNION A,B;
    
    union

    关系描述

    DESCRIBE A;
    
    descripe

    并行处理

    -- 将group的reduce的个数设置为30个
    grouped_record = GROUP recodes BY year PARALLEL 30;
    
    -- 通过set设置,可用于后续的所有后续的作业
    set default_parallel 30
    

    相关文章

      网友评论

          本文标题:【2019-05-29】关于pig

          本文链接:https://www.haomeiwen.com/subject/fkhxaqtx.html