pig包括两部分:
(1)用于描述数据流的语言,Pig Latin
(2)用户执行Pig Latin程序的执行环境。
执行类型:
本地模式(小规模数据集)
pig访问本地文件系统
pig -x local启动grunt。grunt是与pig进行交互的shell环境。
MapReduce模式
在map reduce模式下,pig将查询翻译为mapreduce作业,然后在Hadoop集群上运行。
-- 加载数据A和B,通过A的第0列和B的第一列内连接
A = LOAD 'input/pig/join/A';
B = LOAD 'input/pig/join/B';
C = JOIN A BY $0, /* ignored */ B BY $1;
DUMP C;
-- max_temp_filter_udf.pig
REGISTER pig-examples.jar;
-- 加载函数
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
-- 数据过滤
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
-- group
grouped_records = GROUP filtered_records BY year;
-- 返回最大温度
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
DUMP max_temp;
-- max_temp_filter_stream.pig
DEFINE is_good_quality `is_good_quality.py`
SHIP ('ch16-pig/src/main/python/is_good_quality.py');
records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
filtered_records = STREAM records THROUGH is_good_quality
AS (year:chararray, temperature:int);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
DUMP max_temp;
关系操作
诊断操作
宏和udf
命令
表达式
数据类型
函数
计算函数
过滤函数
加载函数
存储函数
部分内置函数
部分内置函数(续)
-- 对关系进行分组并在每一组内查找最大值,定义成宏
DEFINE max_by_group(X, group_key, max_field) RETURNS Y {
A = GROUP $X by $group_key;
$Y = FOREACH A GENERATE group, MAX($X.$max_field);
};
-- 使用宏
records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
quality IN (0, 1, 4, 5, 9);
max_temp = max_by_group(filtered_records, year, temperature);
DUMP max_temp
用户自定义函数
删除不符合质量要求的气温读数记录
//cc IsGoodQuality A FilterFunc UDF to remove records with unsatisfactory temperature quality readings
// == IsGoodQualityTyped
//vv IsGoodQuality
package com.hadoopbook.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
//^^ IsGoodQuality
import org.apache.pig.FuncSpec;
//vv IsGoodQuality
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
//^^ IsGoodQuality
import org.apache.pig.impl.logicalLayer.schema.Schema;
//vv IsGoodQuality
public class IsGoodQuality extends FilterFunc {
@Override
public Boolean exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return false;
}
try {
Object object = tuple.get(0);
if (object == null) {
return false;
}
int i = (Integer) object;
return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch (ExecException e) {
throw new IOException(e);
}
}
//^^ IsGoodQuality
//vv IsGoodQualityTyped
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>();
funcSpecs.add(new FuncSpec(this.getClass().getName(),
new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
return funcSpecs;
}
//^^ IsGoodQualityTyped
//vv IsGoodQuality
}
// ^^ IsGoodQuality
//从chararray值中去除开头和结尾的空白符
package com.hadoopbook.pig;
import org.apache.pig.PrimitiveEvalFunc;
//cc Trim An EvalFunc UDF to trim leading and trailing whitespace from chararray values
//vv Trim
public class Trim extends PrimitiveEvalFunc<String, String> {
@Override
public String exec(String input) {
return input.trim();
}
}
// ^^ Trim
//加载UDF
//以列区域作为字段加载元组
//cc CutLoadFunc A LoadFunc UDF to load tuple fields as column ranges
package com.hadoopbook.pig;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
// vv CutLoadFunc
public class CutLoadFunc extends LoadFunc {
private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);
private final List<Range> ranges;
private final TupleFactory tupleFactory = TupleFactory.getInstance();
private RecordReader reader;
public CutLoadFunc(String cutPattern) {
ranges = Range.parse(cutPattern);
}
@Override
//setlocation把输入位置传递给加载器
public void setLocation(String location, Job job)
throws IOException {
FileInputFormat.setInputPaths(job, location);
}
@Override
// InputFormat把输入切分成行,getInputFormat()方法为每一个分片新建一个RecodeReader。
public InputFormat getInputFormat() {
return new TextInputFormat();
}
// pig 把每个
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
this.reader = reader;
}
@Override
public Tuple getNext() throws IOException {
try {
if (!reader.nextKeyValue()) {
return null;
}
Text value = (Text) reader.getCurrentValue();
String line = value.toString();
Tuple tuple = tupleFactory.newTuple(ranges.size());
for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i);
if (range.getEnd() > line.length()) {
LOG.warn(String.format(
"Range end (%s) is longer than line length (%s)",
range.getEnd(), line.length()));
continue;
}
tuple.set(i, new DataByteArray(range.getSubstring(line)));
}
return tuple;
} catch (InterruptedException e) {
throw new ExecException(e);
}
}
}
// ^^ CutLoadFunc
数据处理操作
数据加载和存储
STORE A INTO 'out' USING PigStorage(':');
数据的过滤
-- 查询整张表
DUMP A;
-- 逐步处理一个关系中的行
FOREACH…GRNERATE…
-- STREAM可以使用内置命令作为参数,ex. unix cut命令从A中每个元组抽取第二个字段
c = STREAM A THOUGH `cut -f 2`
数据的分组与连接
-- JOIN 内连接
DUMP A;
DUMP B;
C = JOIN A BY $0,B BY $1;
DUMP C;
-- 如果进行连接的关系太大,不能全部放在内存中,则应该使用通用的连接操作。如果有一个关系到能全部放到内存中,则可以使用一种特殊的连接操作,即:分段复制操作,把小的输入关系发送到所有的mapper,并在map端使用内存查找表对(分段的)较大的关系进行连接。
C = JOIN A BY $0,B BY $1 USING "replicated"
-- COGROUP语句和JOIN类似,但它会创建一组嵌套的输出元组集合。默认外连接。
D = COGROUP A BY $0,B BY $1;
-- D同E
E = COGROUP A BY $0 OUTER ,B BY $1 OUTER;
-- 使用内连接
F = COGROUP A BY $0 INNER ,B BY $1 INNER;
原数据
输出的D
-- CROSS 笛卡尔积
I = CROSS A,B;
-- COGROUP用于把两个或多个关系中的数据放到一起。而 GROUP语句则对一个关系中的数据进行分组。
-- 根据第二个字段的字符个数进行分组
B = GROUP A BY SIZE($1);
-- ALL 把一个关系的所有元组放入一个包
C = GROUP A ALL;
-- ANY 用于对关系中的元组随机分组。它对于取样非常有用。
GROUP A ALL
数据的排序
-- 对第一个字段升序和第二个字段降序进行排序
B = ORDER A BY $0,$1 DESC;
数据的组合和切分
-- 并
C = UNION A,B;
union
关系描述
DESCRIBE A;
descripe
并行处理
-- 将group的reduce的个数设置为30个
grouped_record = GROUP recodes BY year PARALLEL 30;
-- 通过set设置,可用于后续的所有后续的作业
set default_parallel 30
网友评论