1 HBase 概述
1.1 什么是 HBase?
HBase 是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用 HBase 技术可在廉价 PC Server 上搭建起大规模结构化存储集群。
HBase 的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。
1.2 HBase 的特点
1、海量存储
HBase适合存储 PB 级别的海量数据,在 PB 级别的数据以及采用廉价 PC 存储的情况下,能在几十到百毫秒内返回数据,这与HBase的易扩展性息息相关,正式因为HBase良好的扩展性,才为海量数据的存储提供了便利。
2、列式存储
这里的列式存储其实说的是列族(ColumnFamily)存储,HBase是根据列族来存储数据的,列族下面可以有非常多的列,列族在创建表的时候就必须指定。
3、易扩展
HBase 的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩展,一个是基于存储的扩展(HDFS)。
通过横向添加 RegionServer 的机器,进行水平扩展,提升 HBase 上层的处理能力,提升 HBase 服务更多 Region 的能力。
4、高并发
由于目前大部分使用 HBase 的架构,都是采用的廉价 PC ,因此单个 IO 的延迟其实并不小,一般在几十到上百 ms 之间,这里说的高并发,主要是在并发的情况下,HBase 的单个 IO 延迟下降并不多。能获得高并发、低延迟的服务。
5、稀疏
稀疏主要是针对 HBase 列的灵活性,在列族中,你可以指定任意多的列,在列数据为空的情况下,是不会占用存储空间的。
1.3 架构
image1、Client
Client 包含了访问HBase的接口,另外 Client 还维护了对应的 cache 来加速 HBase 的访问
2、ZooKeeper
HBase通过 ZooKeeper 来做 Master 的高可用、RegionServer 的监控、元数据的入口以及集群配置的维护等工作,具体工作如下:
- 保证集群中只有 1 个 Master 在运行,如果 Master 异常,会通过竞争机制产生新的 Master 提供服务
- 监控 RegionServer 的状态,当 RegionServer 有异常的时候,通过回调的形式通知 Master RegionServer 上下线的信息
- 存储元数据的统一入口地址
3、 HMaster
该节点的主要职责如下:
- 为 RegionServer 分配 Region
- 维护整个集群的负载均衡
- 维护集群的元数据信息
- 发现失效的 Region ,并将失效的 Region 分配到正常的 RegionServer 上
- 当 RegionServer 失效的时候,协调对应 Hlog 的拆分
4、 HRegionServer
HRegionServer 直接对接用户的读写请求,是真正的干活的节点。它的功能概括如下:
- 管理 Master 为其分配的 Region
- 处理来自客户端的读写请求
- 负责和底层 HDFS 的交互,存储数据到 HDFS
- 负责 Region 变大以后的拆分
- 负责 StoreFile 的合并工作
5、 HDFS
HDFS 为 HBase 提供最终的底层数据存储服务,同时为 HBase 提供高可用( Hlog 存储在 HDFS )的支持,具体功能概括如下:
- 提供元数据和表数据的底层分布式存储服务
- 数据多副本,保证的高可靠和高可用性
1.4 HBase中的角色
1、 HMaster
- 监控 RegionServer
- 处理 RegionServer 故障转移
- 处理元数据的变更
- 处理 Region 的分配或转移
- 在空闲时间进行数据的负载均衡
- 通过 ZooKeeper 发布自己的位置给客户端
2、RegionServer
- 负责存储 HBase 的实际数据
- 处理分配给它的 Region
- 刷新缓存到 HDFS
- 维护 Hlog
- 执行压缩
- 负责处理 Region 分片
3、其他组件
- Write-Ahead logs :HBase 的修改记录,当对 HBase 读写数据时,数据并不是直接写入磁盘,而是将数据先写入内存,但是内存有断电丢失的特性,为了解决这个问题,数据会先写在一个叫做 Write-Ahead logs 的文件中,在写入到内存中,如果系统出现故障,可以通过这个文件重建数据
- Region :表的分片,HBase 会将表按照 RowKey 切分冲不同的 Region 存储在 RegionServer 中
- Store :HFile 存储在Store 中,一个 Store 对应 HBase 表中的一个列族
- MemStore:内存存储,位于内存中,用来保存当前的数据操作,所以当数据保存在 Write-Ahead logs 中之后,RegsionServer 会在内存中存储键值对
- HFile:这是在磁盘上保存原始数据的实际的物理文件,是实际的存储文件,StoreFile 是以 HFile 的形式存储在 HDFS 的
2 HBase安装部署
1、解压
[djm@hadoop102 software]$ tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
3、将目录修改为 hbase
[djm@hadoop102 software]$ sudo mv /opt/module/hbase-1.3.1 /opt/module/hbase
2、修改 hbase-env.sh
[djm@hadoop102 conf]$ vim hbase-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
export HBASE_MANAGES_ZK=false
# Configure PermSize. Only needed in JDK7. You can safely remove it for JDK8+
# export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
# export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
3、修改 hbase-site.xml
[djm@hadoop102 conf]$ vim hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.4.10/zkData</value>
</property>
</configuration>
4、修改 regionservers
[djm@hadoop102 conf]$ vim regionservers
hadoop102
hadoop103
hadoop104
5、软连接 hadoop 配置文件到 hbase
[djm@hadoop102 hadoop]$ ln -s core-site.xml /opt/module/hbase/conf/core-site.xml
[djm@hadoop102 hadoop]$ ln -s hdfs-site.xml /opt/module/hbase/conf/hdfs-site.xml
6、分发
[djm@hadoop102 hadoop]$ xsync /opt/module/hbase
7、启动和停止集群
[djm@hadoop102 hbase]$ bin/start-hbase.sh
[djm@hadoop102 hbase]$ bin/stop-hbase.sh
8、Web 页面
http://hadoop102:16010
3 HBase Shell
进入HBase客户端命令行
[djm@hadoop102 hbase]$ bin/hbase shell
查看帮助命令
hbase(main):001:0> help
查看当前数据有有哪些表
hbase(main):001:0> list
创建表
hbase(main):002:0> create 'student','info'
插入数据到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
扫描查看表数据
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
查看表结构
hbase(main):011:0> describe 'student'
更新指定字段的数据
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
查看指定行或指定列族:列的数据
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
统计表数据行数
hbase(main):021:0> count 'student'
删除数据
hbase(main):016:0> deleteall 'student','1001'
hbase(main):017:0> delete 'student','1002','info:sex'
清空表数据
hbase(main):018:0> truncate 'student'
删除表
首先需要先让该表为disable状态:
hbase(main):019:0> disable 'student'
然后才能drop这个表:
hbase(main):020:0> drop 'student'
提示:如果直接drop表,会报错:ERROR: Table student is enabled. Disable it first.
变更表信息
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
4 HBase数据结构
4.1 RowKey
RowKey 是用来检索记录的主键,访问 table 中的行,只有三种方式:
- 通过单个 RowKey 访问(get)
- 通过 RowKey 的 range(正则 like)
- 全表扫描(scan)
RowKey 可以是任意字符串(最长不能超过 64KB),在 HBase 中,RowKey 被保存为字节数组,存储时,数据按照 RowKey 的字典序(byte order)排序存储。
4.2 Column Family
列族,表中的每个列,都归属于某个列族,列族是表的 Schema 的一部分(而列不是),必须在使用表之前定义,列名都以列族作为前缀。
4.3 Cell
由 {rowkey, column Family:columu,version} 唯一确定的单元,Cell 中的数据是没有类型的,全部是字节码形式存储。
4.4 Time Starmp
HBase 中通过 RowKey 和 Columns 确定的为一个存贮单元称为 Cell,每个 Cell 都保存着同一份数据的多个版本,版本通过时间戳来索引,时间戳的类型是 64 位整型,时间戳可以由 HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间,时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳,每个 Cell 中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。
为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,HBase 提供了两种数据版本回收方式,一是保存数据的最后 n个版本,二是保存最近一段时间内的版本(比如最近七天),用户可以针对每个列族进行设置。
4.5 NameSpace
image1、Table:表,所有的表都是命名空间的成员,即表必属于某个命名空间,如果没有指定,则在 default 中
2、RegionServer Group:一个命名空间包含了默认的 RegionServer Group。
3、Permission:权限,命名空间能够让我们来定义访问控制列表 ACL(Access Control List)
4、Quota:限额,可以强制一个命名空间可包含的 Region 的数量
5 HBase 原理
5.1 读
image1、Client 先访问 ZooKeeper ,从 meta 表读取 Region 的位置,然后读取 meta 表中的数据,meta 中又存储了用户表的Region 信息
2、根据 NameSpace、表名和 RowKey 在 meta 表中找到对应的 Region 信息
3、找到这个 Region 对应的 RegionServer
4、查找对应的 Region
5、先从 MemStore 找数据,如果没有,再到 BlockCache 里面读
6、BlockCache 还没有,再到 StoreFile 上读
7、如果是从 StoreFile 里面读取的数据,不是直接返回给客户端,而是先写入 BlockCache,再返回给客户端
5.2 写
image1、Client 向 HregionServer 发送写请求
2、HregionServer 将数据写到 HLog(write ahead log)
3、HregionServer 将数据写到内存(MemStore)
4、反馈写成功
5.3 Flush
1、当 MemStore 数据达到阈值(默认是128M,老版本是 64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog 中的历史数据
2、并将数据存储到 HDFS 中
3、在 HLog 中做标记点
5.4 合并
1、当数据块达到 3 块,HMaster 触发合并操作,Region 将数据块加载到本地,进行合并
2、当合并的数据超过 256M,进行拆分,将拆分后的 Region 分配给不同的 HregionServer 管理
3、当 HregionServer 宕机后,将 HregionServer 上的 HLog 拆分,然后分配给不同的 HregionServer 加载,修改 meta
注意:HLog 会同步到 HDFS
6 Java 操作 HBase
引入依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
6.1 HBaseApi
HBaseUtil
package com.djm.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseUtil {
private static ThreadLocal<Connection> connHolder = new ThreadLocal<Connection>();
private HBaseUtil() {
}
public static Connection makeHBaseConnection() throws IOException {
Connection conn = connHolder.get();
if (conn == null) {
Configuration conf = HBaseConfiguration.create();
conn = ConnectionFactory.createConnection(conf);
connHolder.set(conn);
}
return conn;
}
/**
* 生成分区键
* @param regionCount
* @return
*/
public static byte[][] genRegionKeys(int regionCount) {
byte[][] bs = new byte[regionCount - 1][];
for (int i = 0; i < regionCount - 1; i++) {
bs[i] = Bytes.toBytes(i + "|");
}
return bs;
}
/**
* 生成分区号
*
* @param rowKey
* @param regionCount
* @return
*/
public static String genRegionNum(String rowKey, int regionCount) {
int regionNum;
int hash = rowKey.hashCode();
if (regionCount > 0 && (regionCount & (regionCount - 1)) == 0) {
regionNum = hash & (regionCount - 1);
} else {
regionNum = hash % (regionCount - 1);
}
return regionNum + "_" + rowKey;
}
public static void close() throws IOException {
Connection conn = connHolder.get();
if (conn != null) {
conn.close();
connHolder.remove();
}
}
}
HBaseApi
package com.djm.hbase;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseApi {
private static Connection conn = null;
private static Admin admin = null;
static {
try {
conn = HBaseUtil.makeHBaseConnection();
admin = conn.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
private static boolean isTableExist(String tableName) throws IOException {
return admin.tableExists(TableName.valueOf(tableName));
}
public static void creteTable(String tableName, String... cloumnFamily) throws IOException {
if (!isTableExist(tableName)) {
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
descriptor.addCoprocessor("com.djm.hbase.InsertStudentCoprocesser");
for (String family : cloumnFamily) {
descriptor.addFamily(new HColumnDescriptor(family));
}
admin.createTable(descriptor);
} else {
System.out.println("-> can't create");
}
}
public static void dropTable(String tableName) throws IOException {
if (isTableExist(tableName)) {
TableName table = TableName.valueOf(tableName);
admin.disableTable(table);
admin.deleteTable(table);
} else {
System.out.println("-> can't remove");
}
}
public static void insertRowData(String tableName, String rowKey, String family, String column, String value) throws IOException {
HTable hTable = new HTable(TableName.valueOf(tableName), conn);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
}
public static void deleteMultiRow(String tableName, String... rows) throws IOException {
HTable hTable = new HTable(TableName.valueOf(tableName), conn);
List<Delete> deleteList = new ArrayList<Delete>();
for (String row : rows) {
deleteList.add(new Delete(Bytes.toBytes(row)));
}
hTable.delete(deleteList);
hTable.close();
}
public static void getRow(String tableName, String rowKey) throws IOException {
HTable table = new HTable(TableName.valueOf(tableName), conn);
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
System.out.print("列族" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("值:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
System.out.print("时间戳:" + cell.getTimestamp());
}
}
public static void getAllRows(String tableName) throws IOException {
HTable hTable = new HTable(TableName.valueOf(tableName), conn);
Scan scan = new Scan();
ResultScanner resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
System.out.print("列族" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("值:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\n");
}
}
}
public static void searchData(String tableName, String family) throws IOException {
Table table = conn.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes("1001"));
RegexStringComparator regexStringComparator = new RegexStringComparator("^\\d{4}$");
Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, binaryComparator);
// MUST_PASS_ALL and
// MUST_PASS_ONE or
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
System.out.print("列族" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("值:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\n");
}
}
}
}
6.2 MapReduce
首先配置 HBase、Hadooop 的环境变量
然后配置 hadoop-env.sh,添加如下信息:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
6.2.1 将 HDFS 中的数据写入到 HBase 表中
ReadFileMapper
package com.djm.mr1.mapper;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ReadFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {
String rowKey = "";
String[] values = line.toString().split(",");
rowKey = values[0];
byte[] bs = Bytes.toBytes(rowKey);
Put put = new Put(bs);
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(values[1]));
context.write(new ImmutableBytesWritable(bs), put);
}
}
InsertDataReduce
package com.djm.mr1.reduce;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class InsertDataReduce extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
File2HBaseTool
package com.djm.mr1.tool;
import com.djm.mr1.mapper.ReadFileMapper;
import com.djm.mr1.reduce.InsertDataReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
public class File2HBaseTool implements Tool {
public int run(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(File2HBaseTool.class);
// format
Path path = new Path("hdfs://hadoop102:9000/data/student.csv");
FileInputFormat.addInputPath(job, path);
// mapper
job.setMapperClass(ReadFileMapper.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
// reduce
TableMapReduceUtil.initTableReducerJob("user", InsertDataReduce.class, job);
// 执行
boolean wait = job.waitForCompletion(true);
return wait ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}
public void setConf(Configuration conf) {
}
public Configuration getConf() {
return null;
}
}
File2HBaseApplication
package com.djm.mr1;
import com.djm.mr1.tool.File2HBaseTool;
import org.apache.hadoop.util.ToolRunner;
public class File2HBaseApplication {
public static void main(String[] args) throws Exception {
ToolRunner.run(new File2HBaseTool(), args);
}
}
6.2.1 将 HBase 表中的数据写入到 MySQL 表中
CacheData
package com.djm.hbase.bean;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CacheData implements WritableComparable<CacheData> {
private String name;
private int count;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public int compareTo(CacheData o) {
return name.compareTo(o.name);
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(count);
}
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
count = in.readInt();
}
}
ScanHBaseMapper
package com.djm.hbase.mapper;
import com.djm.hbase.bean.CacheData;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class ScanHBaseMapper extends TableMapper<Text, CacheData> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
for (Cell cell : value.rawCells()) {
String name = Bytes.toString(CellUtil.cloneValue(cell));
CacheData data = new CacheData();
data.setName(name);
data.setCount(1);
context.write(new Text(name), data);
}
}
}
HBase2MysqlReducer
package com.djm.hbase.reducer;
import com.djm.hbase.bean.CacheData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class HBase2MysqlReducer extends Reducer<Text, CacheData, Text, CacheData> {
@Override
protected void reduce(Text key, Iterable<CacheData> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (CacheData data : values) {
sum = sum + data.getCount();
}
CacheData sumData = new CacheData();
sumData.setName(key.toString());
sumData.setCount(sum);
context.write(key, sumData);
}
}
MysqlOutputFormat
package com.djm.hbase.output;
import com.djm.hbase.bean.CacheData;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlOutputFormat extends OutputFormat<Text, CacheData> {
class MysqlRecordWriter extends RecordWriter<Text, CacheData> {
private static final String MYSQL_DRIVE_CLASS = "com.mysql.jdbc.Driver";
private static final String MYSQL_URL = "jdbc:mysql://hadoop102:3306/company?useUnicode=true&characterEncoding=UTF-8";
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PASSWORD = "123456";
private Connection connection;
public MysqlRecordWriter() {
try {
Class.forName(MYSQL_DRIVE_CLASS);
connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
public void write(Text key, CacheData data) throws IOException, InterruptedException {
String sql = "insert into statresult (name, sumcnt) values(?, ?)";
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setObject(1, key.toString());
preparedStatement.setObject(2, data.getCount());
preparedStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public RecordWriter<Text, CacheData> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new MysqlRecordWriter();
}
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
private FileOutputCommitter committer = null;
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
public static Path getOutputPath(JobContext job) {
String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
return name == null ? null : new Path(name);
}
}
HBaseMysqlTool
package com.djm.hbase.tool;
import com.djm.hbase.output.MysqlOutputFormat;
import com.djm.hbase.bean.CacheData;
import com.djm.hbase.mapper.ScanHBaseMapper;
import com.djm.hbase.reducer.HBase2MysqlReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;
public class HBaseMysqlTool implements Tool {
public int run(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(HBaseMysqlTool.class);
TableMapReduceUtil.initTableMapperJob("student", new Scan(), ScanHBaseMapper.class, Text.class, CacheData.class, job);
job.setReducerClass(HBase2MysqlReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CacheData.class);
job.setOutputFormatClass(MysqlOutputFormat.class);
boolean wait = job.waitForCompletion(true);
return wait ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}
public void setConf(Configuration conf) {
}
public Configuration getConf() {
return null;
}
}
HBase2MysqlApplication
package com.djm.hbase;
import com.djm.hbase.tool.HBaseMysqlTool;
import org.apache.hadoop.util.ToolRunner;
public class HBase2MysqlApplication {
public static void main(String[] args) throws Exception {
ToolRunner.run(new HBaseMysqlTool(), args);
}
}
6.2.3 协处理器
package com.djm.hbase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
/**
* 协处理器
*/
public class InsertStudentCoprocesser extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
Table table = e.getEnvironment().getTable(TableName.valueOf("student"));
table.put(put);
table.close();
}
}
然后将表删除,重新创建表,在创建表时添加上协处理器即可。
6.3 集成 Hive
6.3.1 HBase & Hive
Hive
-
Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 MySQL 中做了一个双射关系,以方便使用 HQL 去管理查询
-
Hive 适用于离线的数据分析和清洗,延迟较高
-
Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 执行
HBase
- 是一种面向列存储的非关系型数据库
- 适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作
- 数据持久化存储的体现形式是 HFile,存放于 DataNode 中,被 ResionServer 以 Region 的形式进行管理
- 面对大量的企业数据,HBase 可以直线单表大量数据的存储,同时提供了高效的数据访问速度
6.3.2 HBase 与 Hive 集成使用
1、重新编译 hive-hbase-handler-1.2.2.jar
2、拷贝 jar 包
export HBASE_HOME=/opt/module/hbase
export HIVE_HOME=/opt/module/hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
编辑 hive-site.xml,增加如下信息:
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表
1、在 Hive 中创建表同时关联 HBase
CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
2、在 Hive 中创建临时中间表,用于 load 文件中的数据
CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
3、向 Hive 中间表中 load 数据
hive> load data local inpath '/home/admin/softwares/data/emp.txt' into tab
4、通过 insert 命令将中间表中的数据导入到 Hive 关联 HBase 的那张表中
hive> select * from hive_hbase_emp_table;
在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据
1、在 Hive 中创建外部表
CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
网友评论