美文网首页玩转大数据
Apache Impala 介绍与使用指南

Apache Impala 介绍与使用指南

作者: 大数据技术架构 | 来源:发表于2020-02-09 13:19 被阅读0次

    一、引言

    最近在梳理大数据相关技术栈,查询引擎篇中重点介绍了Phoenix、Impala及Presto,一时想起自己开始使用Impala时的一个笔记。于是找到笔记拿出来分享,希望能够给Impala初学者或使用者一些参考。

    二、Impala介绍

    1、概述

    Impala是一个架构于hadoop之上的全新、开源MPP查询引擎,提供低延迟、高并发的以读为主的查询。

    Impala提升了查询性能,又保留了用户熟悉的操作。通过Impala,你可以使用SELECT、JOIN和聚集函数等语法,实时地查询储存在HDFS或HBase上的数据。除此之外,Impala使用Hive的元数据库、SQL语法,ODBC驱动及用户界面,提供一个友好、统一的平台进行批处理或实时查询。因此,Hive用户能够很方便的使用Impala。

    2、架构

    为了避免延迟,Impala摒弃了MapReduce引擎,借鉴MPP并行数据库的思想,使用一个专用的、分布式的查询引擎直接访问数据。依据查询的类型和配置,与Hive相比较,性能有数量级的提升。

    由架构图可以看出查询的执行过程:

    1. 由Client发送一个执行SQL到任意一台Impalad的Query Planner
    2. 由Query Planner 把SQL发向Query Coordinator
    3. 由Query Coordinator 来调度分配任务到Impalad的所有节点
    4. 各个Impalad节点的Query Executor 进行执行SQL工作
    5. 执行SQL结束以后,将结果返回给Query Coordinator
    6. 再由Query Coordinator 将结果返回给Client

    3、服务组件

    Impala由三个核心服务构成:Statestored、Catalogd、Impalad。

    • Statestored
      一个实例。Statestore daemon负责收集分布在集群中各个impalad进程的资源信息,各节点健康状况,同步节点信息,负责query的调度。
    • Catalogd
      一个实例。Catalog daemon负责接收来自Statestored的所有请求,把impala的metadata分发到各个impalad。
    • Impalad
      N个实例。Impala daemon运行在1个或多个节点上,与Statestored保持通信,负责接收客户端的请求并返回结果。

    Impala没有主节点,Statestored与Catalogd具有主节点的功能,可当作主节点,Impalad可理解为从节点。

    4、特点

    Impala提供对HDFS、HBase数据的高性能、低延迟的交互式SQL查询,基于Hive并使用内存计算,兼顾数据仓库、具有实时、批处理、多并发等优点。具体特点:

    • 基于内存进行计算,能够对PB级数据进行交互式实时查询、分析
    • 直接读取HDFS数据,完全抛弃MapReduce,省掉了作业启动的开销;借鉴MPP并行数据库的思想,省掉了shuffle、sort等开销;中间结果不落地,省掉了大量的IO开销。
    • C++编写,LLVM统一编译运行
    • 兼容HiveSQL
    • 具有数据仓库的特性,可对Hive数据直接做数据分析
    • 支持Data Local
    • 支持列式存储
    • 支持JDBC/ODBC远程访问

    基于这些特点,Impala是CDH平台首选的PB级大数据实时查询分析引擎。

    另外,Impala也有一些劣势,比如对内存依赖大、过于依赖Hive等,使用上也有其他的限制,具体限制参考官方文档。但这不足以影响Impala良好的实时查询分析特点。

    三、使用指南

    Impala使用Hive的元数据库,因此Impala中操作的库或表与Hive中的相同。SQL语句也基本遵循HiveQL的语法。

    1、Impala Shell

    演示使用impala-shell连接Impalad,并简单测试Impala的DDL操作,包括建删库、建删表等。这里是简单的操作,不多赘言。

    2、刷新元数据

    当Hive的元数据发生变化时,比如新建了表、新插入了数据,Impala需要更新元数据,通常可以使用-r参数,invalidate metadata,refresh tablename操作。其中refresh只适应于刷新Impala已识别的表;invalidate metadata会刷新所有表的缓存元数据,是一项比较耗资源的操作,建议使用invalidate metadata tablename;-r不建议在生产环境下使用。

    [hadoop101:21000] > select count(*) from customer;
    Query: select count(*) from customer
    +----------+
    | count(*) |
    +----------+
    | 0        |
    +----------+
    Fetched 1 row(s) in 0.60s
    [hadoop101:21000] > refresh customer;                         
    Query: refresh customer
    
    Fetched 0 row(s) in 0.21s
    [hadoop101:21000] > select count(*) from customer;
    Query: select count(*) from customer
    +----------+
    | count(*) |
    +----------+
    | 500000   |
    +----------+
    Fetched 1 row(s) in 0.86s
    

    3、统计信息

    Impala对于JOIN查询的性能优化,最简单的方式是通过执行COMPUTE STATS来收集涉及到的所有表的统计信息,让 Impala 基于每一个表的大小、每一个列不同值的个数等信息自动的优化查询。

    [hadoop101:21000] > show table stats customer;
    Query: show table stats customer
    +-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
    | #Rows | #Files | Size    | Bytes Cached | Cache Replication | Format | Incremental stats | Location                                                      |
    +-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
    | 0     | 1      | 63.78MB | NOT CACHED   | NOT CACHED        | TEXT   | false             | hdfs://hadoop101:8020/user/hive/warehouse/alltest.db/customer |
    +-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
    Fetched 1 row(s) in 0.05s
    [hadoop101:21000] > compute stats customer;
    Query: compute stats customer
    +------------------------------------------+
    | summary                                  |
    +------------------------------------------+
    | Updated 1 partition(s) and 18 column(s). |
    +------------------------------------------+
    Fetched 1 row(s) in 3.94s
    [hadoop101:21000] > show table stats customer;
    Query: show table stats customer
    +--------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
    | #Rows  | #Files | Size    | Bytes Cached | Cache Replication | Format | Incremental stats | Location                                                      |
    +--------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
    | 500000 | 1      | 63.78MB | NOT CACHED   | NOT CACHED        | TEXT   | false             | hdfs://hadoop101:8020/user/hive/warehouse/alltest.db/customer |
    +--------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
    Fetched 1 row(s) in 0.01s
    [hadoop101:21000] > show column stats customer;
    Query: show column stats customer
    +------------------------+--------+------------------+--------+----------+--------------------+
    | Column                 | Type   | #Distinct Values | #Nulls | Max Size | Avg Size           |
    +------------------------+--------+------------------+--------+----------+--------------------+
    | c_customer_sk          | BIGINT | 487663           | -1     | 8        | 8                  |
    | c_customer_id          | STRING | 498574           | -1     | 16       | 16                 |
    | c_current_cdemo_sk     | BIGINT | 421111           | -1     | 8        | 8                  |
    | c_current_hdemo_sk     | BIGINT | 6968             | -1     | 8        | 8                  |
    | c_current_addr_sk      | BIGINT | 207204           | -1     | 8        | 8                  |
    | c_first_shipto_date_sk | BIGINT | 3652             | -1     | 8        | 8                  |
    | c_first_sales_date_sk  | BIGINT | 3659             | -1     | 8        | 8                  |
    | c_salutation           | STRING | 7                | -1     | 4        | 3.130199909210205  |
    | c_first_name           | STRING | 4971             | -1     | 11       | 5.636499881744385  |
    | c_last_name            | STRING | 4963             | -1     | 13       | 5.912799835205078  |
    | c_preferred_cust_flag  | STRING | 3                | -1     | 1        | 0.9653999805450439 |
    | c_birth_day            | INT    | 31               | -1     | 4        | 4                  |
    | c_birth_month          | INT    | 12               | -1     | 4        | 4                  |
    | c_birth_year           | INT    | 67               | -1     | 4        | 4                  |
    | c_birth_country        | STRING | 208              | -1     | 20       | 8.381899833679199  |
    | c_login                | STRING | 1                | -1     | 0        | 0                  |
    | c_email_address        | STRING | 486557           | -1     | 47       | 26.5               |
    | c_last_review_date     | STRING | 380              | -1     | 7        | 6.752699851989746  |
    +------------------------+--------+------------------+--------+----------+--------------------+
    Fetched 18 row(s) in 0.02s
    

    4、执行计划

    通过在SQL语句前加入“EXPLAIN”关键字,可以返回该语句的执行计划。执行计划从底层显示Impala如何读取数据,如何在各节点之间协调工作,组合并传输中间结果,并获得最终结果集的全过程。根据执行计划,可以判断语句的执行效率,进行性能优化。

    [hadoop101:21000] > explain select count(*) from customer;                           
    Query: explain select count(*) from customer
    +---------------------------------------------------------------------+
    | Explain String                                                      |
    +---------------------------------------------------------------------+
    | Estimated Per-Host Requirements: Memory=154.00MB VCores=1           |
    | WARNING: The following tables have potentially corrupt table        |
    | statistics. Drop and re-compute statistics to resolve this problem. |
    | alltest.customer                                                    |
    |                                                                     |
    | 03:AGGREGATE [FINALIZE]                                             |
    | |  output: count:merge(*)                                           |
    | |                                                                   |
    | 02:EXCHANGE [UNPARTITIONED]                                         |
    | |                                                                   |
    | 01:AGGREGATE                                                        |
    | |  output: count(*)                                                 |
    | |                                                                   |
    | 00:SCAN HDFS [alltest.customer]                                     |
    |    partitions=1/1 files=1 size=63.78MB                              |
    +----------------------------------------------
    

    5、Hints
    Impala SQL支持查询提示,用于微调查询内部运作。因为缺少统计信息或其他因素导致效率低下的昂贵查询,使用提示作为临时的解决办法。所谓提示是指由方括号[]扩起的词。目前所有的提示都是用于控制连接查询的执行策略的(all the hints control the execution strategy for join queries)。

    在查询的 JOIN 关键词之后立刻紧跟以下结构之一:

    • [SHUFFLE]
      连接操作使用了“分割(partitioned) ”技术,使用哈希算法分割两个表对应的行,发送这些行的子集到其他节点进行处理(关键字 SHUFFLE 用于表示 “分割连接(partitioned join)”,因为这种连接与“分区表”无关)。因为当表和索引统计信息不可用时,默认使用另外的“广播(broadcast)”连接机制,你可以在广播连接不适用的情况使用此提示;通常分割连接对具有相似大小的大表之间的连接更有效。
    • [BROADCAST]
      使用“广播(broadcast)”技术,把右边(right-hand)表的完整内容发送到所有节点执行连接处理。这是表和索引统计信息不可用时的默认操作方式,所以你通常只需要在元数据失效导致 Impala 错误的选择分割连接操作时才使用此提示。通常当其中一个表比另外一个表小很多的时候更有效(把较小的表放在 JOIN 操作符的右侧)
      核对某一查询的 EXPLAIN 的输出信息,确定该程序所采用的连接机制。实际测试SQL:
    SELECT 
        CASE
            WHEN JIAZHI_FLAG = '**' OR JIAZHI_FLAG IS NULL THEN '4' 
            WHEN JIAZHI_FLAG = '01' THEN '1'
            WHEN JIAZHI_FLAG = '02' THEN '2'
            WHEN JIAZHI_FLAG = '03' THEN '3'
        END jiazhi ,
        COUNT(
            CASE
                WHEN WENDING_FLAG = '**'
                OR WENDING_FLAG IS NULL
                THEN 1
                ELSE NULL
            END
        ) weizhi
    FROM
        LABEL_INFO_Parquet a JOIN [shuffle](
            SELECT
                b.CLIENTCODE AS CC
            FROM
                CLIENT_INFO_Parquet b
            WHERE
                1 = 1
                AND(
                    b.ORGPATH LIKE('/root/HAB/%')
                    OR b.ORGPATH LIKE('/root/%')
                )
        ) c
            ON c.CC = a.ID
    GROUP BY
        CASE
            WHEN JIAZHI_FLAG = '**' OR JIAZHI_FLAG IS NULL THEN '4' 
            WHEN JIAZHI_FLAG = '01' THEN '1'
            WHEN JIAZHI_FLAG = '02' THEN '2'
            WHEN JIAZHI_FLAG = '03' THEN '3'
        END
    limit 10;
    

    6、设置parquet文件大小

    impala-shell中使用set PARQUET_FILE_SIZE设置parquet格式文件大小

    [hadoop101:21000] > set PARQUET_FILE_SIZE=134217728;
    PARQUET_FILE_SIZE set to 134217728
    [hadoop101:21000] > insert into table customer_par select * from customer;
    Query: insert into table customer_par select * from customer
    Inserted 500000 row(s) in 7.09s
    

    往期文章精选

    干货 | Elasticsearch 索引设计实战指南

    超越数据湖和数据仓库的新范式:LakeHouse

    HBase 集成 Phoenix 构建二级索引实践

    贝壳找房基于 Flink 的实时平台建设

    如果您喜欢这篇文章,点【在看】与转发都是一种鼓励,期待得到您的认可 ❥(^_-)

    相关文章

      网友评论

        本文标题:Apache Impala 介绍与使用指南

        本文链接:https://www.haomeiwen.com/subject/rdigxhtx.html