美文网首页
hbase-phoenix集合的应用

hbase-phoenix集合的应用

作者: 会飞的蜗牛66666 | 来源:发表于2018-11-23 13:50 被阅读0次

我们知道hbase是列式存储的分布式数据库,数据是以kv形式存储的,hbase官方也开放了API接口供我们使用,进行数据的各种交互也是很方便,但是hbase本身是nosql数据库,不支持sql的查询,于是phoenix横空出世,就是为了解决hbase的sql化查询而生。
下面我们就介绍phoenix结合hbase的应用:
1,建表映射hbase中的表
create table IF NOT EXISTS "phoenixtest"("ROW" varchar primary key, "info"."name" varchar , "info"."age" varchar,"info"."addr" varchar);
把HBASE中的ROW当作主键
表名和列族以及列名需要用双引号括起来,因为HBase是区分大小写的,如果不用双引号括起来的话Phoenix在创建表的时候会自动将小写转换为大写字母
2,sql查询
select * from "phoenixtest";


image.png

3,java API查询
pom依赖:
<dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-core</artifactId>
        <version>4.7.0-HBase-1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-server-client -->
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-server-client</artifactId>
        <version>4.7.0-HBase-1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>1.1.2</version>
        <type>pom</type>
    </dependency>

</dependencies>

编写一个phoenix连接池:
JdbcConnectionPool
/**

  • phoenix连接池
    */
    public class JdbcConnectionPool implements DataSource {
    private static PropertyUtil property = PropertyUtil.getInstance();
    private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JdbcConnectionPool.class);
    private static String driver;
    private static String url;
    private static int jdbcConnectionInitSize;//最小连接数量
    private static int max = 10; //当前最大连接数量
    private static LinkedList list = new LinkedList<Connection>();

    static {
    try {
    url = property.getString("url", "");
    driver = property.getString("driver", "");
    Class.forName(driver);
    jdbcConnectionInitSize = property.getInt("jdbcConnectionInitSize", 0);
    //创建最小数据库的连接
    for (int i = 0; i < jdbcConnectionInitSize; i++) {
    final Connection conn = DriverManager.getConnection(url);
    System.out.println("connected to phoenix...");
    list.add(conn);
    }
    } catch (SQLException e) {
    e.printStackTrace();
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }
    }

    @Override
    public Connection getConnection() throws SQLException {
    if (list.size() == 0 && max <= 50) {
    try {
    Class.forName(driver);
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }
    for (int i = 0; i < jdbcConnectionInitSize; i++) {
    final Connection conn = DriverManager.getConnection(url);
    list.add(conn);
    }
    max++;
    }
    if (list.size() > 0) {
    final Connection conn1 = (Connection) list.removeFirst();
    return (Connection) Proxy.newProxyInstance(JdbcConnectionPool.class.getClass().getClassLoader(),
    new Class[]{Connection.class}, new InvocationHandler() {

                     @Override
                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                         if (!method.getName().equalsIgnoreCase("close")) {
                             return method.invoke(conn1, args);
                         } else {
                             list.add(conn1);
                             return null;
                         }
                     }
                 });
     } else {
         System.out.println("connect phoenix error.");
     }
     return null;
    

    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
    return null;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
    return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
    return false;
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
    return null;
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {

    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {

    }

    @Override
    public int getLoginTimeout() throws SQLException {
    return 0;
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
    return null;
    }

}

编写连接工具:
JdbcConnection
public class JdbcConnection {
private static JdbcConnectionPool pool = new JdbcConnectionPool();
private final static Connection conn = null;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JdbcConnection.class);

/**
 * 获取链接
 *
 * @return
 * @throws SQLException
 */
public static Connection getConnection() throws SQLException {
    return pool.getConnection();
}

/**
 * 释放连接
 *
 * @param conn
 * @param st
 * @param rs
 */
public static void release(final Connection conn, final PreparedStatement st, final ResultSet rs) {
    Runtime.getRuntime().addShutdownHook(new Thread() {

        @Override
        public void run() {
            super.run();
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (st != null) {
                try {
                    st.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    });
}

}

加载配置文件:
//加载配置文件
public class PropertyUtil extends Properties {

private static final long serialVersionUID = 50440463580273222L;

private static PropertyUtil instance = null;

public static synchronized PropertyUtil getInstance() {
    if (instance == null) {
        instance = new PropertyUtil();
    }
    return instance;
}

public String getProperty(String key, String defaultValue) {
    String val = getProperty(key);
    return (val == null || val.isEmpty()) ? defaultValue : val;
}

public String getString(String name, String defaultValue) {
    return this.getProperty(name, defaultValue);
}

public int getInt(String name, int defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}

public long getLong(String name, long defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}

public float getFloat(String name, float defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}

public double getDouble(String name, double defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}

public byte getByte(String name, byte defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}

public PropertyUtil() {
    InputStream in;
    try {
        in = this.getClass().getClassLoader().getResourceAsStream("cloudConfig.properties");
        this.load(in);
        in.close();
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
}

}
编写连接测试:
public class PhoenixTest {

public static void main(String[] args) {
    Connection conn = null;
    Statement statement = null;
    final PropertyUtil property = PropertyUtil.getInstance();
    final String url = property.getString("url", "");
    final String driver = property.getString("driver", "");
    final String sql = property.getString("sql", "");
    try {
        Class.forName(driver);
        conn = DriverManager.getConnection(url);
        statement = conn.createStatement();
        System.out.println("--------------------------");
        final ResultSet rs = statement.executeQuery(sql);
        while (rs.next()) {
            System.out.println(rs.getString("name"));
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            conn.close();
            statement.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

配置文件
cloudConfig.properties:

jdbc info

url=jdbc:phoenix:192.168.1.12:2181
driver=org.apache.phoenix.jdbc.PhoenixDriver
jdbcConnectionInitSize=10
sql=select * from "phoenixtest"

注意此时直接运行是连接不上的,为什么?
因为phoenix需要加载hbase的配置文件才能找到需要的相关参数,于是我们将hbase-site.xml放入到source 目录conf下
hbase-site.xml
<configuration>

<property>
  <name>dfs.domain.socket.path</name>
  <value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>

<property>
  <name>hbase.bulkload.staging.dir</name>
  <value>/apps/hbase/staging</value>
</property>

<property>
  <name>hbase.client.keyvalue.maxsize</name>
  <value>1048576</value>
</property>

<property>
  <name>hbase.client.retries.number</name>
  <value>35</value>
</property>

<property>
  <name>hbase.client.scanner.caching</name>
  <value>100</value>
</property>

<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

<property>
  <name>hbase.coprocessor.master.classes</name>
  <value></value>
</property>

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint</value>
</property>

<property>
  <name>hbase.custom-extensions.root</name>
  <value>/hdp/ext/2.6/hbase</value>
</property>

<property>
  <name>hbase.defaults.for.version.skip</name>
  <value>true</value>
</property>

<property>
  <name>hbase.hregion.majorcompaction</name>
  <value>604800000</value>
</property>

<property>
  <name>hbase.hregion.majorcompaction.jitter</name>
  <value>0.50</value>
</property>

<property>
  <name>hbase.hregion.max.filesize</name>
  <value>10737418240</value>
</property>

<property>
  <name>hbase.hregion.memstore.block.multiplier</name>
  <value>4</value>
</property>

<property>
  <name>hbase.hregion.memstore.flush.size</name>
  <value>134217728</value>
</property>

<property>
  <name>hbase.hregion.memstore.mslab.enabled</name>
  <value>true</value>
</property>

<property>
  <name>hbase.hstore.blockingStoreFiles</name>
  <value>10</value>
</property>

<property>
  <name>hbase.hstore.compaction.max</name>
  <value>10</value>
</property>

<property>
  <name>hbase.hstore.compactionThreshold</name>
  <value>3</value>
</property>

<property>
  <name>hbase.local.dir</name>
  <value>${hbase.tmp.dir}/local</value>
</property>

<property>
  <name>hbase.master.info.bindAddress</name>
  <value>0.0.0.0</value>
</property>

<property>
  <name>hbase.master.info.port</name>
  <value>16010</value>
</property>

<property>
  <name>hbase.master.namespace.init.timeout</name>
  <value>2400000</value>
</property>

<property>
  <name>hbase.master.port</name>
  <value>16000</value>
</property>

<property>
  <name>hbase.master.ui.readonly</name>
  <value>false</value>
</property>

<property>
  <name>hbase.master.wait.on.regionservers.timeout</name>
  <value>30000</value>
</property>

<property>
  <name>hbase.region.server.rpc.scheduler.factory.class</name>
  <value>org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory</value>
</property>

<property>
  <name>hbase.regionserver.executor.openregion.threads</name>
  <value>20</value>
</property>

<property>
  <name>hbase.regionserver.global.memstore.size</name>
  <value>0.4</value>
</property>

<property>
  <name>hbase.regionserver.handler.count</name>
  <value>30</value>
</property>

<property>
  <name>hbase.regionserver.info.port</name>
  <value>16030</value>
</property>

<property>
  <name>hbase.regionserver.port</name>
  <value>16020</value>
</property>

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

<property>
  <name>hbase.rootdir</name>
  <value>hdfs://node3:8020/apps/hbase/data</value>
</property>

<property>
  <name>hbase.rpc.protection</name>
  <value>authentication</value>
</property>

<property>
  <name>hbase.rpc.timeout</name>
  <value>90000</value>
</property>

<property>
  <name>hbase.security.authentication</name>
  <value>simple</value>
</property>

<property>
  <name>hbase.security.authorization</name>
  <value>false</value>
</property>

<property>
  <name>hbase.superuser</name>
  <value>hbase</value>
</property>

<property>
  <name>hbase.tmp.dir</name>
  <value>/tmp/hbase-${user.name}</value>
</property>

<property>
  <name>hbase.zookeeper.property.clientPort</name>
  <value>2181</value>
</property>

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>node1,node2,node3,node4,node6,node7,node8,node9,node10</value>
</property>

<property>
  <name>hbase.zookeeper.useMulti</name>
  <value>true</value>
</property>

<property>
  <name>hfile.block.cache.size</name>
  <value>0.4</value>
</property>

<property>
  <name>phoenix.functions.allowUserDefinedFunctions</name>
  <value>true</value>
</property>

<property>
  <name>phoenix.query.timeoutMs</name>
  <value>60000</value>
</property>

<property>
  <name>zookeeper.recovery.retry</name>
  <value>6</value>
</property>

<property>
  <name>zookeeper.session.timeout</name>
  <value>90000</value>
</property>

<property>
  <name>zookeeper.znode.parent</name>
  <value>/hbase-unsecure</value>
</property>

</configuration>

程序目录结构如下:


image.png

select * from "ott_deviceinfo_buffer" where "userid"='13860507270';
注意,13860507270字符串必须用单引号,userid字段必须用双引号

创建二级索引
create index idex0 on "ott_deviceinfo_buffer" ("info"."deviceid");
创建多级索引
create local index idex2 on "ott_deviceinfo_buffer" ("info"."terminalid","info"."terminalmode");
查看是否应用了二级索引
explain select * from "ott_deviceinfo_buffer" where "userid"='13860507270';


image.png

创建异步索引表
CREATE INDEX async_index ON ""ott_deviceinfo_buffer"" (v) ASYNC
通过create index的时候指定 ASYNC 关键字来指定异步创建索引。执行这个命令之后并不会引起索引表与源表的直接同步。这个时候查询并不会使用这个索引表。那么索引数据的导入还需要采用phoenix提供的索引同步工具类 IndexTool , 这是一个mapreduce工具类,使用方式如下:
${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool
--schema MY_SCHEMA --data-table MY_TABLE --index-table ASYNC_IDX
--output-path ASYNC_IDX_HFILES
执行结束以后才会应用索引表

相关文章

  • hbase-phoenix集合的应用

    我们知道hbase是列式存储的分布式数据库,数据是以kv形式存储的,hbase官方也开放了API接口供我们使用,进...

  • Phoenix开发

    本文的主线 项目 => 操作 本文的示例代码参考hbase-phoenix[http://git.nuozhili...

  • 集合的应用

    集合的特点 集合中的元素一般是无序的、不重复的 集合的常见操作 add(value):向集合中添加一项remove...

  • 我的Swift的学习总结 -->第二周

    集合 集合:Set,定义一个集合可以写成:var 集合名 : Set<集合类型> = [集合元素],具体的集合应用...

  • Java集合的应用

    /*创建一个学生数组,存储三个学生对象并遍历 * 1.写一个学生类 * 2.创建学生数组 * 3. 创建学生对象 ...

  • JDK 源码解析 —— 集合框架

    转载自:Java集合框架实例 1- 介绍 集合是程序和语言的基本思想。应用程序通常都会应用到集合,例如雇员的信息,...

  • Java8-流-简介

    几乎每个java应用程序都会制造和处理集合,要是没有集合,还能做什么呢?尽管集合对于几乎任何一个java应用都是不...

  • JAVA集合应用

    ArrayList使用 public class Notice { }/** list示例 公告需求管理 @aut...

  • 集合应用类型

    Object 目前为止,大多数的引用对象都是Object。有两种方式创建Object对象,第一种就是使用new对象...

  • Swift第二周学习总结

    第六天 集合 1.集合的定义 2.集合的相关应用 字典 1.字典的定义 2.字典的相关应用 函数 1.定义函数 2...

网友评论

      本文标题:hbase-phoenix集合的应用

      本文链接:https://www.haomeiwen.com/subject/jwqjqqtx.html