美文网首页大数据学习
Impala快速上手:Impala简介,Impala shell

Impala快速上手:Impala简介,Impala shell

作者: xiaogp | 来源:发表于2020-11-11 16:48 被阅读0次

摘要:impalaJDBCDBCPimpyla

impala简介

Impala是Cloudera由C++编写的基于MPP(massively parallel processing 大规模并行处理)理念的查询引擎,由运行在CDH集群上的不同的守护进程组成,它跟Hive的metastore集成,共用database和tables等信息。
Impala具有下面几个优势:

  • impala跟现有的CDH组件自动集成,数据可以被CDH中的各种组件共用
  • 支持sql查询hbasehdfskudu等。
  • impala只需要几秒钟或者分钟级别就能返回数据
  • 支持parquet、text、rcfile、hfile等文件格式

操作impala-shell

impala-shell -i cloudera01:21000 -l --auth_creds_ok_in_clear -u cdh01
  • -i: 集群中任意一台impalad服务器,如果使用了该参数需要输入用户密码
  • -l: 使用ldap, --auth_creds_ok_in_clear 由于没有使用ssl,需要添加该参数
  • -u: 用户
    其他参数:
  • -B : 去除输出格式(表格)从而降低性能负载,配合--output_delimiter=一起使用,默认分割符是\t--output_delimiter=,可以指定分隔符
  • -o: 将查询结果输出到一个目录,例如-o "./load_data_test.csv"查询完毕后exit在本地目录会有一个csv格式文件
  • -f: 在命令行运行一个sql脚本,比如
[root@cloudera01 home]# cat query.sql 
use test_gp;
insert into student_info values('5', 345);
insert into student_info values('6', 345);
insert into student_info values('7', 345);
insert into student_info values('8', 345);
insert into student_info values('9', 345);

在命令行执行

[root@cloudera01 pgeng]# impala-shell -i cloudera01:21000 -l --auth_creds_ok_in_clear -u cdh001 -f query.sql

建库建表

[cloudera01:21000] > create database test_gp;
Query: create database test_gp
Fetched 0 row(s) in 5.71s
[cloudera01:21000] > create table student_info(name String, age INTEGER);
Query: create table student_info(name String, age INTEGER)
Fetched 0 row(s) in 6.59s
[cloudera01:21000] > insert into student_info values('王帆', 15);
Query: insert into student_info values('王帆', 15)
Query submitted at: 2020-11-11 16:46:21 (Coordinator: http://cloudera01:25000)
Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=624958465fedb197:13ace98200000000
Modified 1 row(s) in 6.14s
[cloudera01:21000] > insert into student_info values('猪坚强', 16);
Query: insert into student_info values('猪坚强', 16)
Query submitted at: 2020-11-11 16:46:41 (Coordinator: http://cloudera01:25000)
Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=27405bb44af85efa:49d42b600000000
Modified 1 row(s) in 6.34s
[cloudera01:21000] > insert into student_info values('李想', 17);
Query: insert into student_info values('李想', 17)
Query submitted at: 2020-11-11 16:46:56 (Coordinator: http://cloudera01:25000)
Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=564d89653cdde3a2:8ed5550a00000000
[cloudera01:21000] > select * from student_info;
Query: select * from student_info
Query submitted at: 2020-11-11 16:47:11 (Coordinator: http://cloudera01:25000)
Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=1c4ba5816f72703e:490e41f000000000
+--------+-----+
| name   | age |
+--------+-----+
| 王帆   | 15  |
| 李想   | 17  |
| 猪坚强 | 16  |
+--------+-----+

JDBC连接

使用hive-jdbc链接impala,增加pom依赖

<dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.1.0-cdh5.15.2</version>
        </dependency>

测试查询impala和往impala插入数据,设置driverurlusernamepasswd

import java.sql.*;
import java.util.HashMap;
import java.util.Map;

public class ImpalaUtils {
    private Connection conn = null;
    private static ImpalaUtils instance = null;

    public ImpalaUtils() {
        try {
            String driver = "org.apache.hive.jdbc.HiveDriver";
            String JDBCUrl = "jdbc:hive2://192.168.60.104:21050/test_gp";
            String username = "cdh01";
            String password = "******";
            Class.forName(driver);
            conn = DriverManager.getConnection(JDBCUrl, username, password);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static ImpalaUtils getInstance() {
        if (instance == null) {
            synchronized (ImpalaUtils.class) {
                if (instance == null) {
                    instance = new ImpalaUtils();
                }
            }
        }
        return instance;
    }

    public static int selectTest(String name) {
        int res = 0;
        Statement statement = null;
        try {
            statement = ImpalaUtils.getInstance().conn.createStatement();
            ResultSet result = statement.executeQuery(String.format("select * from test_gp.student_info where name = '%s';", name));
            while (result.next()) {
                res = Integer.parseInt(result.getString("age"));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return res;
    }

    public static void insertTest(Map<String, Integer> map) {
        Connection conn = ImpalaUtils.getInstance().conn;
        PreparedStatement preparedStatement;
        try {
            preparedStatement = conn.prepareStatement("insert into test_gp.student_info (name, age) values(?,?)");

            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                preparedStatement.setString(1, entry.getKey());
                preparedStatement.setInt(2, entry.getValue());
                preparedStatement.execute();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int res = ImpalaUtils.selectTest("王帆");
        System.out.println(res);
        insertTest(new HashMap<String, Integer>() {{
            put("陈龙", 23);
            put("小王", 55);
            put("老陈", 12);
        }});
    }
}

使用DBCP连接池

使用DBCP连接池链接Impala查询数据

import java.sql.{Connection, Statement}
import java.util.{HashMap, Properties}

import ImpalaUtils.msqlComnsMap
import org.apache.commons.dbcp.BasicDataSource

object ImpalaScala {
  private var msqlComnsMap: java.util.Map[String,BasicDataSource] = new HashMap[String,BasicDataSource]

  def getConnPool(prop: Properties, name:String) = {
    if (!msqlComnsMap.containsKey(name)){
      this.synchronized {
        if (!msqlComnsMap.containsKey(name)){
          var tmpCon: BasicDataSource = new BasicDataSource()
          tmpCon = new BasicDataSource()
          tmpCon.setDriverClassName(prop.getProperty("jdbc.driver"))
          tmpCon.setUrl(prop.getProperty("jdbc."+name+".url"))
          tmpCon.setUsername(prop.getProperty("jdbc."+name+".user"))
          tmpCon.setPassword(prop.getProperty("jdbc."+name+".passwd"))
          tmpCon.setMaxActive(4)
          tmpCon.setMaxIdle(4)
          tmpCon.setMinIdle(2)
          tmpCon.setInitialSize(4)
          tmpCon.setMaxWait(10000)
          tmpCon.setTestWhileIdle(true)
          tmpCon.setValidationQuery("select 1")
          tmpCon.setValidationQueryTimeout(10000)
          tmpCon.setTimeBetweenEvictionRunsMillis(10000)
          msqlComnsMap.put(name,tmpCon)
        }
      }
    }

    msqlComnsMap.get(name)
  }


  def main(args: Array[String]): Unit = {
    val prop = new Properties() {
      {
        put("jdbc.driver", "org.apache.hive.jdbc.HiveDriver")
        put("jdbc.impala.user", "cdh01")
        put("jdbc.impala.passwd", "******")
        put("jdbc.impala.url", "jdbc:hive2://192.168.60.104:21050/test_gp")
      }
    }
    val connPool: BasicDataSource = ImpalaScala.getConnPool(prop, "impala")

    if (connPool != null) {
      var conn: Connection = null
      var statement: Statement = null
      try {
        conn = connPool.getConnection
        statement = conn.createStatement
        val res = statement.executeQuery("select * from test_gp.student_info where name='王帆';")
        while (res.next) {
          System.out.println("age => " + res.getString("age"))
        }
      } catch {
        case e: Exception =>
          e.printStackTrace()
      } finally try {
        if (statement != null) statement.close()
        if (conn != null) conn.close()
      } catch {
        case e: Exception =>
          System.out.println()
          e.printStackTrace()
      }
    }
  }
}


Python操作Impala

Python操作Impala使用Python库impyla,使用示例如下

root@ubuntu:~/ # pip install impyla
from impala.dbapi import connect
from impala.util import as_pandas

config = {
    "host": "192.20.3.55", 
    "port": 21050,
    "database": "test",
    "user": "cdh_dev", 
    "password": "123456", 
    "auth_mechanism": "PLAIN"
}

def get_impala_data(query):
    conn = None
    cursor = None
    data = []
    try:
        conn = connect(**config)
        cursor = conn.cursor()
        cursor.execute(query)
        data = as_pandas(cursor)  # 支持fetch_all等收集方式
    except Exception as e:
        print("error", e.args)
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

if __name__ == '__main__':
    res = get_impala_data("select * from sqoop_test")

相关文章

网友评论

    本文标题:Impala快速上手:Impala简介,Impala shell

    本文链接:https://www.haomeiwen.com/subject/uzjovktx.html