<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
JAVA类 | 对应的数据模型 |
---|---|
HBaseConfiguration | HBase配置类 |
HBaseAdmin | HBase管理Admin类 |
Table | HBase Table操作类 |
Put | HBase添加操作数据模型 |
Get | HBase单个查询操作数据模型 |
Scan | HBase Scan检索操作数据模型 |
Result | HBase单个查询的结果模型 |
ResultScanner | HBase检索结果模型 |
1. 封装一个工具类
package com.zsk.hbase.api;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;
public class HBaseConn {
private static final HBaseConn INSTANCE = new HBaseConn();
private static Configuration configuration; //hbase配置
private static Connection connection; //hbase connection
//初始化一些基本的参数
private HBaseConn(){
try{
if (configuration==null){
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
}
}catch (Exception e){
e.printStackTrace();
}
}
//创建连接对象
private Connection getConnection(){
if (connection==null || connection.isClosed()){
try{
connection = ConnectionFactory.createConnection(configuration);
}catch (Exception e){
e.printStackTrace();
}
}
return connection;
}
//返回连接
public static Connection getHBaseConn(){
return INSTANCE.getConnection();
}
//获取表
public static Table getTable(String tableName) throws IOException {
return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
}
//关闭连接
public static void closeConn(){
if (connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2. 具体操作类方法说明
package com.zsk.hbase.api;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Arrays;
import java.util.List;
public class HBaseUtil {
/**
* 创建表
* @param tableName 创建表的表名称
* @param cfs 列簇的集合
* @return
*/
public static boolean createTable(String tableName, String[] cfs) {
try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) {
if (admin.tableExists(tableName)) {
return false;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
Arrays.stream(cfs).forEach(cf -> {
HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);
columnDescriptor.setMaxVersions(1);
tableDescriptor.addFamily(columnDescriptor);
});
admin.createTable(tableDescriptor);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 删除表
* @param tableName 表名称
* @return
*/
public static boolean deleteTable(String tableName){
try(HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConn().getAdmin()){
if (!admin.tableExists(tableName)){
return false;
}
admin.disableTable(tableName);
admin.deleteTable(tableName);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/** * 插入数据
* @param tableName
* @param rowkey
* @param cfName
* @param qualifer
* @param data
* @return
*/
public static boolean putRow(String tableName,String rowkey,String cfName,String qualifer,String data){
try(Table table = HBaseConn.getTable(tableName)){
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifer),Bytes.toBytes(data));
table.put(put);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
*批量出入数据
* @param tableName
* @param puts
* @return
*/
public static boolean putRows(String tableName, List<Put> puts){
try(Table table = HBaseConn.getTable(tableName)){
table.put(puts);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
* 查询单条数据
* @param tableName
* @param rowkey
* @return
*/
public static Result getRow(String tableName,String rowkey){
try( Table table = HBaseConn.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowkey));
return table.get(get);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 带有过滤器的插入数据
* @param tableName
* @param rowkey
* @param filterList
* @return
*/
public static Result getRow(String tableName, String rowkey, FilterList filterList){
try( Table table = HBaseConn.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowkey));
get.setFilter(filterList);
Result result = table.get(get);
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
return result;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
*scan扫描数据,
* @param tableName
* @return
*/
public static ResultScanner getScanner(String tableName){
try( Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setCaching(1000);
ResultScanner results = table.getScanner(scan);
results.forEach(result -> {
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
});
return results;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
*can 检索数据,控制startrow,stoprow 注意包括startrow 不包括stoprow,
* @param tableName
* @param startKey
* @param stopKey
* @return
*/
public static ResultScanner getScanner(String tableName,String startKey,String stopKey){
try( Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startKey));
scan.setStopRow(Bytes.toBytes(stopKey));
scan.setCaching(1000);
ResultScanner results = table.getScanner(scan);
results.forEach(result -> {
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
});
return results;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* scan 检索数据,控制startrow,stoprow 注意包括startrow 不包括stoprow,filterList对查询过滤
* @param tableName
* @param startKey
* @param stopKey
* @param filterList
* @return
*/
public static ResultScanner getScanner(String tableName,String startKey,String stopKey,FilterList filterList){
try( Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setFilter(filterList);
scan.setStartRow(Bytes.toBytes(startKey));
scan.setStopRow(Bytes.toBytes(stopKey));
scan.setCaching(1000);
ResultScanner results = table.getScanner(scan);
results.forEach(result -> {
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
});
return results;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 删除行
* @param tableName
* @param rowkey
* @return
*/
public static boolean deleteRow(String tableName,String rowkey){
try( Table table = HBaseConn.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
*删除列簇
* @param tableName
* @param cfName
* @return
*/
public static boolean deleteColumnFamily(String tableName,String cfName){
try(HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConn().getAdmin()){
admin.deleteColumn(tableName,cfName);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
* 删除列
* @param tableName
* @param cfName
* @return
*/
public static boolean deleteQualifier(String tableName,String rowkey,String cfName,String qualiferName){
try(Table table = HBaseConn.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowkey));
delete.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualiferName));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
}
网友评论