一、引言
最近在梳理大数据相关技术栈,查询引擎篇中重点介绍了Phoenix、Impala及Presto,一时想起自己开始使用Impala时的一个笔记。于是找到笔记拿出来分享,希望能够给Impala初学者或使用者一些参考。
二、Impala介绍
1、概述
Impala是一个架构于hadoop之上的全新、开源MPP查询引擎,提供低延迟、高并发的以读为主的查询。
Impala提升了查询性能,又保留了用户熟悉的操作。通过Impala,你可以使用SELECT、JOIN和聚集函数等语法,实时地查询储存在HDFS或HBase上的数据。除此之外,Impala使用Hive的元数据库、SQL语法,ODBC驱动及用户界面,提供一个友好、统一的平台进行批处理或实时查询。因此,Hive用户能够很方便的使用Impala。
2、架构
为了避免延迟,Impala摒弃了MapReduce引擎,借鉴MPP并行数据库的思想,使用一个专用的、分布式的查询引擎直接访问数据。依据查询的类型和配置,与Hive相比较,性能有数量级的提升。
![](https://img.haomeiwen.com/i2650272/49e702aa96361e8b.png)
由架构图可以看出查询的执行过程:
- 由Client发送一个执行SQL到任意一台Impalad的Query Planner
- 由Query Planner 把SQL发向Query Coordinator
- 由Query Coordinator 来调度分配任务到Impalad的所有节点
- 各个Impalad节点的Query Executor 进行执行SQL工作
- 执行SQL结束以后,将结果返回给Query Coordinator
- 再由Query Coordinator 将结果返回给Client
3、服务组件
Impala由三个核心服务构成:Statestored、Catalogd、Impalad。
-
Statestored
一个实例。Statestore daemon负责收集分布在集群中各个impalad进程的资源信息,各节点健康状况,同步节点信息,负责query的调度。 -
Catalogd
一个实例。Catalog daemon负责接收来自Statestored的所有请求,把impala的metadata分发到各个impalad。 -
Impalad
N个实例。Impala daemon运行在1个或多个节点上,与Statestored保持通信,负责接收客户端的请求并返回结果。
Impala没有主节点,Statestored与Catalogd具有主节点的功能,可当作主节点,Impalad可理解为从节点。
4、特点
Impala提供对HDFS、HBase数据的高性能、低延迟的交互式SQL查询,基于Hive并使用内存计算,兼顾数据仓库、具有实时、批处理、多并发等优点。具体特点:
- 基于内存进行计算,能够对PB级数据进行交互式实时查询、分析
- 直接读取HDFS数据,完全抛弃MapReduce,省掉了作业启动的开销;借鉴MPP并行数据库的思想,省掉了shuffle、sort等开销;中间结果不落地,省掉了大量的IO开销。
- C++编写,LLVM统一编译运行
- 兼容HiveSQL
- 具有数据仓库的特性,可对Hive数据直接做数据分析
- 支持Data Local
- 支持列式存储
- 支持JDBC/ODBC远程访问
基于这些特点,Impala是CDH平台首选的PB级大数据实时查询分析引擎。
另外,Impala也有一些劣势,比如对内存依赖大、过于依赖Hive等,使用上也有其他的限制,具体限制参考官方文档。但这不足以影响Impala良好的实时查询分析特点。
三、使用指南
Impala使用Hive的元数据库,因此Impala中操作的库或表与Hive中的相同。SQL语句也基本遵循HiveQL的语法。
1、Impala Shell
演示使用impala-shell连接Impalad,并简单测试Impala的DDL操作,包括建删库、建删表等。这里是简单的操作,不多赘言。
2、刷新元数据
当Hive的元数据发生变化时,比如新建了表、新插入了数据,Impala需要更新元数据,通常可以使用-r参数,invalidate metadata,refresh tablename操作。其中refresh只适应于刷新Impala已识别的表;invalidate metadata会刷新所有表的缓存元数据,是一项比较耗资源的操作,建议使用invalidate metadata tablename;-r不建议在生产环境下使用。
[hadoop101:21000] > select count(*) from customer;
Query: select count(*) from customer
+----------+
| count(*) |
+----------+
| 0 |
+----------+
Fetched 1 row(s) in 0.60s
[hadoop101:21000] > refresh customer;
Query: refresh customer
Fetched 0 row(s) in 0.21s
[hadoop101:21000] > select count(*) from customer;
Query: select count(*) from customer
+----------+
| count(*) |
+----------+
| 500000 |
+----------+
Fetched 1 row(s) in 0.86s
3、统计信息
Impala对于JOIN查询的性能优化,最简单的方式是通过执行COMPUTE STATS来收集涉及到的所有表的统计信息,让 Impala 基于每一个表的大小、每一个列不同值的个数等信息自动的优化查询。
[hadoop101:21000] > show table stats customer;
Query: show table stats customer
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
| #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental stats | Location |
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
| 0 | 1 | 63.78MB | NOT CACHED | NOT CACHED | TEXT | false | hdfs://hadoop101:8020/user/hive/warehouse/alltest.db/customer |
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
Fetched 1 row(s) in 0.05s
[hadoop101:21000] > compute stats customer;
Query: compute stats customer
+------------------------------------------+
| summary |
+------------------------------------------+
| Updated 1 partition(s) and 18 column(s). |
+------------------------------------------+
Fetched 1 row(s) in 3.94s
[hadoop101:21000] > show table stats customer;
Query: show table stats customer
+--------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
| #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental stats | Location |
+--------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
| 500000 | 1 | 63.78MB | NOT CACHED | NOT CACHED | TEXT | false | hdfs://hadoop101:8020/user/hive/warehouse/alltest.db/customer |
+--------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------------------+
Fetched 1 row(s) in 0.01s
[hadoop101:21000] > show column stats customer;
Query: show column stats customer
+------------------------+--------+------------------+--------+----------+--------------------+
| Column | Type | #Distinct Values | #Nulls | Max Size | Avg Size |
+------------------------+--------+------------------+--------+----------+--------------------+
| c_customer_sk | BIGINT | 487663 | -1 | 8 | 8 |
| c_customer_id | STRING | 498574 | -1 | 16 | 16 |
| c_current_cdemo_sk | BIGINT | 421111 | -1 | 8 | 8 |
| c_current_hdemo_sk | BIGINT | 6968 | -1 | 8 | 8 |
| c_current_addr_sk | BIGINT | 207204 | -1 | 8 | 8 |
| c_first_shipto_date_sk | BIGINT | 3652 | -1 | 8 | 8 |
| c_first_sales_date_sk | BIGINT | 3659 | -1 | 8 | 8 |
| c_salutation | STRING | 7 | -1 | 4 | 3.130199909210205 |
| c_first_name | STRING | 4971 | -1 | 11 | 5.636499881744385 |
| c_last_name | STRING | 4963 | -1 | 13 | 5.912799835205078 |
| c_preferred_cust_flag | STRING | 3 | -1 | 1 | 0.9653999805450439 |
| c_birth_day | INT | 31 | -1 | 4 | 4 |
| c_birth_month | INT | 12 | -1 | 4 | 4 |
| c_birth_year | INT | 67 | -1 | 4 | 4 |
| c_birth_country | STRING | 208 | -1 | 20 | 8.381899833679199 |
| c_login | STRING | 1 | -1 | 0 | 0 |
| c_email_address | STRING | 486557 | -1 | 47 | 26.5 |
| c_last_review_date | STRING | 380 | -1 | 7 | 6.752699851989746 |
+------------------------+--------+------------------+--------+----------+--------------------+
Fetched 18 row(s) in 0.02s
4、执行计划
通过在SQL语句前加入“EXPLAIN”关键字,可以返回该语句的执行计划。执行计划从底层显示Impala如何读取数据,如何在各节点之间协调工作,组合并传输中间结果,并获得最终结果集的全过程。根据执行计划,可以判断语句的执行效率,进行性能优化。
[hadoop101:21000] > explain select count(*) from customer;
Query: explain select count(*) from customer
+---------------------------------------------------------------------+
| Explain String |
+---------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=154.00MB VCores=1 |
| WARNING: The following tables have potentially corrupt table |
| statistics. Drop and re-compute statistics to resolve this problem. |
| alltest.customer |
| |
| 03:AGGREGATE [FINALIZE] |
| | output: count:merge(*) |
| | |
| 02:EXCHANGE [UNPARTITIONED] |
| | |
| 01:AGGREGATE |
| | output: count(*) |
| | |
| 00:SCAN HDFS [alltest.customer] |
| partitions=1/1 files=1 size=63.78MB |
+----------------------------------------------
5、Hints
Impala SQL支持查询提示,用于微调查询内部运作。因为缺少统计信息或其他因素导致效率低下的昂贵查询,使用提示作为临时的解决办法。所谓提示是指由方括号[]扩起的词。目前所有的提示都是用于控制连接查询的执行策略的(all the hints control the execution strategy for join queries)。
在查询的 JOIN 关键词之后立刻紧跟以下结构之一:
- [SHUFFLE]
连接操作使用了“分割(partitioned) ”技术,使用哈希算法分割两个表对应的行,发送这些行的子集到其他节点进行处理(关键字 SHUFFLE 用于表示 “分割连接(partitioned join)”,因为这种连接与“分区表”无关)。因为当表和索引统计信息不可用时,默认使用另外的“广播(broadcast)”连接机制,你可以在广播连接不适用的情况使用此提示;通常分割连接对具有相似大小的大表之间的连接更有效。 - [BROADCAST]
使用“广播(broadcast)”技术,把右边(right-hand)表的完整内容发送到所有节点执行连接处理。这是表和索引统计信息不可用时的默认操作方式,所以你通常只需要在元数据失效导致 Impala 错误的选择分割连接操作时才使用此提示。通常当其中一个表比另外一个表小很多的时候更有效(把较小的表放在 JOIN 操作符的右侧)
核对某一查询的 EXPLAIN 的输出信息,确定该程序所采用的连接机制。实际测试SQL:
SELECT
CASE
WHEN JIAZHI_FLAG = '**' OR JIAZHI_FLAG IS NULL THEN '4'
WHEN JIAZHI_FLAG = '01' THEN '1'
WHEN JIAZHI_FLAG = '02' THEN '2'
WHEN JIAZHI_FLAG = '03' THEN '3'
END jiazhi ,
COUNT(
CASE
WHEN WENDING_FLAG = '**'
OR WENDING_FLAG IS NULL
THEN 1
ELSE NULL
END
) weizhi
FROM
LABEL_INFO_Parquet a JOIN [shuffle](
SELECT
b.CLIENTCODE AS CC
FROM
CLIENT_INFO_Parquet b
WHERE
1 = 1
AND(
b.ORGPATH LIKE('/root/HAB/%')
OR b.ORGPATH LIKE('/root/%')
)
) c
ON c.CC = a.ID
GROUP BY
CASE
WHEN JIAZHI_FLAG = '**' OR JIAZHI_FLAG IS NULL THEN '4'
WHEN JIAZHI_FLAG = '01' THEN '1'
WHEN JIAZHI_FLAG = '02' THEN '2'
WHEN JIAZHI_FLAG = '03' THEN '3'
END
limit 10;
6、设置parquet文件大小
impala-shell中使用set PARQUET_FILE_SIZE设置parquet格式文件大小
[hadoop101:21000] > set PARQUET_FILE_SIZE=134217728;
PARQUET_FILE_SIZE set to 134217728
[hadoop101:21000] > insert into table customer_par select * from customer;
Query: insert into table customer_par select * from customer
Inserted 500000 row(s) in 7.09s
往期文章精选
如果您喜欢这篇文章,点【在看】与转发都是一种鼓励,期待得到您的认可 ❥(^_-)
网友评论