美文网首页大数据大数据
数据分析引擎 —— Pig

数据分析引擎 —— Pig

作者: 小胡_鸭 | 来源:发表于2022-03-28 09:46 被阅读0次

    一、Pig

    1、简介

      Pig是一个基于Apache Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口,使用者可以透过Python或者JavaScript编写Java,之后再重新转写。

      Pig 的特点总结如下:

    • 1)将 Pig Latin 翻译成 MapReduce 任务执行,可以看成是一个翻译器、映射器;
    • 2)可以看成是 Hadoop 的客户端软件,为数据分析人员提供通过简单的类 SQL,连接到 Hadoop 集群并进行数据分析的接口和工具;
    • 3)Pig Latin 可以进行排序、过滤、求和、分组、关联等常用操作,可以自定义函数,是一种面向数据分析处理的轻量级脚本语言;
    • 4)Pig 可自动对集群进行分配和回收,自动地对 MapReduce 程序进行优化。

    2、与 Hive 的对比

      Pig与Hive作为一种高级数据语言,均运行于HDFS之上,是hadoop上层的衍生架构,用于简化hadoop任务,并对MapReduce进行一个更高层次的封装。Pig与Hive的区别如下:

    • Pig是一种面向过程的数据流语言;Hive是一种数据仓库语言,并提供了完整的sql查询功能。
    • Pig更轻量级,执行效率更快,适用于实时分析;Hive适用于离线数据分析。
    • Hive查询语言为Hql,支持分区;Pig查询语言为Pig Latin,不支持分区。
    • Hive支持JDBC/ODBC;Pig不支持JDBC/ODBC。
    • Pig适用于半结构化数据(如:日志文件);Hive适用于结构化数据。


    二、安装配置

      解压

    tar -zxvf pig-0.17.0.tar.gz -C ~/training/
    

      配置环境变量

    PIG_HOME=/root/training/pig-0.17.0
    export PIG_HOME
    
    # 本地模式不需要,但是集群模式需要的变量
    PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop
    export PIG_CLASSPATH
    

    1、本地模式

      启动

    pig -x local
    

      可以看到配置好环境变量之后,在命令行中输入 pig 按 tab 键会自动提示可执行的命令或脚本,以本地模式启动后,可以看到 Pig 连接到的是本地文件系统。

    2、集群模式

      集群模式需要先启动 hadoop,再启动 pig

    pig
    

      可以看到集群模式启动后,pig 连接到的是 hadoop 文件系统。


    三、常用命令

    pig 常用命令非常简单,并且对 hdfs 执行一些操作时会发现比 hdfs 自带的命令执行要快。
    
    # 查看文件系统当前目录下的所有文件目录
    ls
    # 切换到某个目录下
    cd <dir>
    # 输出某个文件的内容
    cat <file>
    # 查看当前目录
    pwd
    # 创建目录
    mkdir <dir>
    # 从本地文件系统拷贝到 HDFS 上
    copyFromLocal <localfile> <hdfsDir>
    # 将 HDFS 文件拷贝到本地文件系统
    copyToLocal <hdfsFile> <localDir>
    
    # 通过 sh cmd 可将命令操作的目标从HDFS改为本地文件系统
    sh命令:调用操作系统的命令,操作本地的文件系统
    
    # pig 自定义函数的定义和注册
    define
    register
    


    四、数据模型

      pig 的表不是矩形的(即每一行都有相同的列),pig 的表被称为包(bag),包中存在行(Tuple)准确地说叫元组,每个元组中存在多个列,表允许不同的元组有完全不相同的列。

      列可以是基本类型int,long,float,double,chararray(string:注意以单引号),bytearray(byte[]),也可以嵌套 map,type,bag 等复杂类型。

      如果人为把每一行都设置成具有相同的列,则叫做一个关系;Pig 的物理存储结构是 JSON 格式。


    五、PigLatin 处理数据

      首先需要启动 hadoop 的 historyserver

    mr-jobhistory-daemon.sh start historyserver
    

      常用的 Pig Latin 语句有:

    1、load:加载数据到表(Bag)
    2、foreach:相当于循环,对bag中的每一条数据(Tuple)进行处理
    3、filter:相当于where
    4、group by:分组
    5、join:连接
    6、generate:提取列
    7、union、intersect:集合计算
    8、输出:dump 直接打印在屏幕上
             store 保存到HDFS中
    

      数据准备,在 HDFS 的 /scott 目录下存在两个 csv 文件,分别是员工表 emp.csv

    empno ename job mgr hiredate sal comm deptno
    7369 SMITH CLERK 7902 1980/12/17 800 0 20
    7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30
    7521 WARD SALESMAN 7698 1981/2/22 1250 500 30
    7566 JONES MANAGER 7839 1981/4/2 2975 0 20
    7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30
    7698 BLAKE MANAGER 7839 1981/5/1 2850 0 30
    7782 CLARK MANAGER 7839 1981/6/9 2450 0 10
    7788 SCOTT ANALYST 7566 1987/4/19 3000 0 20
    7839 KING PRESIDENT -1 1981/11/17 5000 0 10
    7844 TURNER SALESMAN 7698 1981/9/8 1500 0 30
    7876 ADAMS CLERK 7788 1987/5/23 1100 0 20
    7900 JAMES CLERK 7698 1981/12/3 950 0 30
    7902 FORD ANALYST 7566 1981/12/3 3000 0 20
    7934 MILLER CLERK 7782 1982/1/23 1300 0 10

      部门表 dept.csv

    10 ACCOUNTING NEW YORK
    20 RESEARCH DALLAS
    30 SALES CHICAGO
    40 OPERATIONS BOSTON

    1、加载数据到表

    table = load 'hdfsfile';
    

      加载 /scott/emp.csv 到 emp 表,并查看表结构

    emp = load '/scott/emp.csv';
    desc emp;
    

      由于没有指定表结构(字段和字段类型),查询表结构返回 unknow。

      指定表的列,默认数据类型是 bytearray

    emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);
    

      加载员工数据到表,并且指定每个tuple的schema和数据类型

    emp = load '/scott/emp.csv' as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
    

      通过 dump emp; 可以查询表中数据,效果相当于 select * from emp; 并且会触发一个 MapReduce 作业的执行,但是查询到数据却是空的。

      原因是 Pig 加载数据到表时,默认的列值分隔符是 tab(这点跟hive一样),需要指定分隔符。

    emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
    dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);
    

      再执行 dump emp; 查询,正常返回数据。

    2、查询某些列(投影)

    # 语法
    tempTable = foreach table generate col1,col2,col3;
    
    # 相当于 SQL
    select col1,col2,col3 from table;
    

      查询员工表的员工号、姓名、薪水。

    emp3 = foreach emp generate empno,ename,sal;
    


    3、排序

    # 语法
    tempTable = order table by col;
    
    # 相当于 SQL
    select * from table order by col;
    

      查询员工信息,按照月薪排序

    emp4 = order emp by sal;
    


    4、分组

    # 语法
    tempTable = group table by col;
    
    # 相当于 SQL
    from table group by col;
    
    # 获得投影
    foreach tempTable generate col,MAX(col2),MIN(col3),SUM(col4),...;
    
    # 两条 Pig Latin 合起来才相当于一条完整的 SQL
    select col,MAX(col2) from table group by col;
    

      求每个部门的最高工资

    # SQL
    select deptno,max(sal) from emp group by deptno;
    
    # Pig Latin
    emp51 = group emp by deptno;
    emp52 = foreach emp51 generate group,MAX(emp.sal);
    

      执行分组后,dump emp51; 得到的数据

    grunt> describe emp51;
    emp51: {group: int,emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}
    grunt> dump emp51;
    (10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,0,10),
       (7839,KING,PRESIDENT,-1,1981/11/17,5000,0,10),
       (7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10)})
       
    (20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,0,20),
       (7788,SCOTT,ANALYST,7566,1987/4/19,3000,0,20),
       (7369,SMITH,CLERK,7902,1980/12/17,800,0,20),
       (7566,JONES,MANAGER,7839,1981/4/2,2975,0,20),
       (7902,FORD,ANALYST,7566,1981/12/3,3000,0,20)})
       
    (30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
       (7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
       (7698,BLAKE,MANAGER,7839,1981/5/1,2850,0,30),
       (7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
       (7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
       (7900,JAMES,CLERK,7698,1981/12/3,950,0,30)})
    

      最后 dump emp52; 结果为



    5、过滤

    # 语法
    filter table by col == value;
    
    # 相当于 SQL
    select * from table where col = value;
    

      查询 10 号部门的所有员工

    # SQL
    select * from emp where deptno = 10;
    
    # Pig Latin
    emp6 = filter emp by deptno==10;
    dump emp6;
    


    6、多表查询

    # 语法
    tempTable1 = join table1 by col, table2 by col;
    tempTable2 = foreach tempTable1 generate table1::col1,table2::col2
    
    # 等价 SQL
    select table1.col1, table2.col2 
    from table1,table2 
    where table1.col = table2.col;
    

      查询员工姓名和部门名称

    # SQL
    select ename,dname from emp,dept where emp.deptno = dept.deptno;
    
    # Pig Latin
    emp71 = join emp by deptno, dept by deptno;
    emp72 = foreach emp71 generate emp::ename,dept::dname;
    

      执行结果



    7、集合运算

    # 求和集
    tempTable = union table1,table2;
    
    # 求交集
    tempTable = intersect table1,table2;
    

      查询10号和20号部门的员工

    # SQL
    select * from emp where deptno = 10
        union
    select * from emp where deptno = 20;
    
    # Pig Latin
    emp10 = filter emp by deptno == 10;
    emp20 = filter emp by deptno == 20;
    emp10_20 = union emp10,emp20;
    

      执行结果




    六、自定义函数

      需要的依赖

    $PIG_HOME/pig-0.17.0-core-h2.jar
    $PIG_HOME/lib
    $PIG_HOME/lib/h2
    $HADOOP_HOME/share/hadoop/common
    $HADOOP_HOME/share/hadoop/common/lib
    $HADOOP_HOME/share/hadoop/mapreduce
    $HADOOP_HOME/share/hadoop/mapreduce/lib
    

    1、自定义运算函数

      自定义运算函数需要继承 org.apache.pig.EvalFunc 类。

      现在要实现一个需求:根据员工薪水判断薪水的级别,不大于1000是 Grade A,大于1000不大于3000是 Grade B,大于3000是 Grade C。

    /**
     * 根据员工薪水判断薪水的级别
     * 
     * 1、sal <= 1000, 返回Grade A
     * 2、sal > 1000 && sal <= 3000, 返回Grade B
     * 3、sal > 3000, 返回Grade C
     */
    public class CheckSalaryGrade extends EvalFunc<String> {
    
        /**
         * 调用
         * emp = load ***
         * emp2 = foreach emp generate empno,ename,sal,demo.CheckSalaryGrade(sal);
         */
        @Override
        public String exec(Tuple tuple) throws IOException {
            // 获取员工薪水
            int sal = (Integer) tuple.get(0);
            if (sal <= 1000)        return "Grade A";
            else if (sal <= 3000)   return "Grade B";
            else                    return "Grade C";       
        }
    }
    

      打包 jar,并在 pig shell 上注册

    grunt> register /root/temp/program/mypig.jar;
    grunt> emp2 = foreach emp generate empno,ename,sal,demo.CheckSalaryGrade(sal);
    grunt> dump emp2;
    

    2、自定义过滤函数

      自定义过滤函数需要继承 org.apache.pig.FilterFunc

      现在用自定义过滤函数实现一个简单的需求,过滤掉薪水低于3000的员工数据。

    // 如果员工薪水大于等于3000块钱,就选择出来
    public class IsSalaryTooHigh extends FilterFunc {
    
        /**
         * 调用
         * emp = load ***
         * emp1 = filter emp by demo.IsSalaryTooHigh(sal);
         */
        @Override
        public Boolean exec(Tuple tuple) throws IOException {
            // 获取当前员工的薪水
            int sal = (Integer) tuple.get(0);
            
            return sal >= 3000 ? true : false;
        }
    }
    

      打包注册,调用自定义过滤函数。

    grunt> register /root/temp/program/mypig.jar;
    grunt> emp1 = filter emp by demo.IsSalaryTooHigh(sal);
    grunt> dump emp1;
    


    3、自定义加载函数

      默认情况下,一行数据加载时会被拆解为一个 Tuple,但是某些情况下,希望有特殊的处理,这时就需要使用自定义加载函数来加载,自定义加载函数需要继承 org.apache.pig.LoadFunc

      现在实现一个简单的需求,将数据文件中的每个单词解析成一个 Tuple,一行数据可能有多个行,所以每行最终会被解析为一个 bag,代码如下:

    public class MyLoadFunc extends LoadFunc {
    
        // 定义变量来代表输入流
        private RecordReader reader;
        
        @Override
        public InputFormat getInputFormat() throws IOException {
            // 处理的数据类型是什么(输入的类型)
            return new TextInputFormat();
        }
    
        @Override
        public Tuple getNext() throws IOException {
            // 从输入流中读取一行,如何解析该行数据
            // 数据: I love Beijing
            Tuple result = null;
            try {
                // 判断是否读取了数据
                if (!this.reader.nextKeyValue()) {
                    // 没有数据
                    return result;                          
                }
                
                // 得到数据
                String data = this.reader.getCurrentValue().toString();
                // 分词
                String[] words = data.split(" ");
                // 构造返回的结果
                result = TupleFactory.getInstance().newTuple();
                
                // 每一个单词单独生成一个Tuple(s),再把这些tuple放入一个bag
                // 在这个bag放入result中
                
                // 创建表
                DataBag bag = BagFactory.getInstance().newDefaultBag();
                for(String w : words) {
                    //为每个单词生成一个tuple
                    Tuple aTuple = TupleFactory.getInstance().newTuple();
                    aTuple.append(w);
                    
                    //再把aTuple放入表
                    bag.add(aTuple);
                }
                
                // 最后,把bag表放入result
                result.append(bag);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return result;
        }
    
        @Override
        public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
            // 初始化加载程序
            this.reader = reader;       
        }
    
        @Override
        public void setLocation(String path, Job job) throws IOException {
            // HDFS的输入路径
            FileInputFormat.setInputPaths(job, new Path(path));
        }
    }
    

      继承 LoadFunc 抽象类,需要实现几个方法。首先,需要实现 prepareToRead() 方法,该方法会传递读取一行数据的迭代器 RecordReader 对象,该对象被自定义加载函数类内定义的成员接收;setLocation() 方法会设置 HDFS 的输出路径;getInputFormat() 方法设置了读取数据的格式,这里读取的是文本数据,所以设置为 TextInputFormat;真正加载数据时,每次读取数据都会调用 getNext() 方法,这里是自定义加载函数的核心逻辑,代码中会先去读取一个数据化并进行分词,对每个单词生成一个 Tuple 对象,最终一行数据对应的所有 Tuple 对象都会添加到 DataBag 对象中,DataBag 对象又被嵌套添加到返回的 Tuple 对象中。

      通过以下命令加载和使用自定义加载函数,并打印结果:

    grunt> register /root/temp/program/mypig.jar;
    grunt> record = load '/input/data.txt' using demo.MyLoadFunc();
    grunt> dump record;
    

    相关文章

      网友评论

        本文标题:数据分析引擎 —— Pig

        本文链接:https://www.haomeiwen.com/subject/gltpjrtx.html