本节介绍一下如何使用HBase上的数据源开发一个MapReduce程序:这里以WordCount程序为例。
0.搭建本地开发环境
(0)将HBase的主机名加入本地Hosts
编辑C:\Windows\System32\drivers\etc\hosts文件,加入下面的内容:
192.168.126.110 bigdata
(1)下载依赖的HBase Jar包
使用WinSCP工具下载$HBASE_HOME/lib目录下的所有Jar包至本地目录如E:/hbaselibs中。
(2)下载依赖的MapReduce Jar包
使用WinSCP将$HADOOP_HOME/share/hadoop/common/lib和$HADOOP_HOME/share/hadoop/mapreduce/lib目录下的所有Jar包下载至本地目录如E:/mapreducelibs中。
(3)新建HBaseWordCount工程
打开Eclipse IDE,依次选择”File”->”New”->”Java Project”,工程名字填写HBaseWordCount,”Finsh”。
(4)给工程添加依赖包
在HBaseWordCount工程上右键单击,依次选择”New”->”Folder”,文件夹名字填写”libs”,”Finish”。将E:/hbaselibs和E:/mapreducelibs目录中的所有Jar包复制、粘贴到工程下面的libs文件夹中。展开libs文件夹,选中所有Jar包,右键,依次选择”Build Path”->”Add to Build Path”即可。
(5)创建Demo Package
在HBaseWordCount工程下面的src文件上右键,依次选择”New”->”Package”,Package名字填写”Demo”,”Finish”。
(6)新建三个Java Class
右键Demo,依次选择”New”->”Class”,类名填写”WordCountMapper”,”Finish”。
右键Demo,依次选择”New”->”Class”,类名填写”WordCountReducer”,”Finish”。
右键Demo,依次选择”New”->”Class”,类名填写”WordCountMain”,”Finish”。
这三个类的代码如下所示。
1.编写Mapper程序
HBase WordCount的Mapper程序WordCountMapper.java的内容如下:
package Demo;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//TableMapper<K2, V2>:输入的是表中的记录,没有<K1,V1>
public class WordCountMapper extends TableMapper<Text, LongWritable> {
@Override
//输入的就是表中的一条记录
protected void map(ImmutableBytesWritable key, //key : 记录的rowkey
Result value, //value: 输入的记录
Context context) //MapReduce上下文
throws IOException, InterruptedException {
//取出指定列下的数据
String str = Bytes.toString(value.getValue(Bytes.toBytes("Content"), Bytes.toBytes("line")));
//按空格分词
String[] words = str.split(" ");
//输出(word, 1)
for(String w: words){
context.write(new Text(w), new LongWritable(1));
}
}
}
2.编写Reducer程序
HBase WordCount的Reducer程序WordCountReducer.java的内容如下:
package Demo;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//TableReducer<K3,V3,Reduce的输出:一条记录>
public class WordCountReducer extends TableReducer<Text, LongWritable, ImmutableBytesWritable> {
@Override
//输入的就是表中的一条记录
protected void reduce(Text k3, Iterable<LongWritable> v3, Context context)
throws IOException, InterruptedException {
// 求和
long total = 0;
for(LongWritable l:v3){
total = total + l.get();
}
//构造Put对象: 使用单词k3作为rowkey
Put put = new Put(Bytes.toBytes(k3.toString()));
//加载数据
put.addColumn(Bytes.toBytes("Content"), Bytes.toBytes("result"), Bytes.toBytes(String.valueOf(total)));
//输出到HBase
context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), //插入数据的时候,rowkey是多少
put);//数据
}
}
3.编写主程序
HBase WordCount的主程序WordCountMain.java的内容如下:
package Demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class WordCountMain {
public static void main(String[] args) throws Exception {
//创建配置信息
Configuration conf = new Configuration();
//指定ZooKeeper的地址
conf.set("hbase.zookeeper.quorum", "192.168.126.110");
//创建Job
Job job = Job.getInstance(conf);
//设置主类
job.setJarByClass(WordCountMain.class);
//定义扫描器
Scan scan = new Scan();
//指定要读取的列
scan.addColumn(Bytes.toBytes("Content"), Bytes.toBytes("line"));
//指定任务的Mapper
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("tblArticle"), //输入表
scan, //扫描器
WordCountMapper.class, //Mapper类
Text.class, //K2类型
LongWritable.class, //V2类型
job ); //Job
//指定任务的Reducer
TableMapReduceUtil.initTableReducerJob("tblResult", //输出表
WordCountReducer.class, //Reducer类
job); //Job
//执行Job
job.waitForCompletion(true);
}
}
4.将程序打包并上传
(1)在Demo上右键,依次选择Export,JAR file,Next,选择到处位置:E:\HBaseWordCount.jar,Next,Next,指定Main Class:Demo.WordCountMain,Finish。
(2)使用WinSCP将E:\HBaseWordCount.jar上传至服务器bigdata的/root/input目录下。
5.准备HBase上的数据源
(1)创建输入表tblArticle
[root@master ~]# hbase shell
hbase(main):001:0> create 'tblArticle','Content'
hbase(main):002:0> put 'tblArticle','1','Content':line','I love Beijing'
hbase(main):003:0> put 'tblArticle','2','Content':line','I love China'
hbase(main):004:0> put 'tblArticle','3','Content':line','Beijing is the capital of China'
hbase(main):005:0> scan 'tblArticle'
ROW COLUMN+CELL
1 column=Content:line, timestamp=1534172325722, value=I love Beijing
2 column=Content:line, timestamp=1534172332586, value=I love China
3 column=Content:line, timestamp=1534172338304, value=Beijing is the capital of China
(2)创建输出表tblResult
hbase(main):006:0> create 'tblResult','Content'
hbase(main):007:0> scan 'tblResult'
ROW COLUMN+CELL
0 row(s)
Took 0.5658 seconds
6.执行HBase WordCount程序
进入/root/input目录下,执行HBaseWordCount.jar程序:
[root@bigdata ~]# cd /root/input
[root@bigdata input]# hadoop jar HBaseWordCount.jar
查看结果:
hbase(main):011:0> scan 'tblResult'
ROW COLUMN+CELL
Beijing column=Content:result, timestamp=1534173299148, value=2
China column=Content:result, timestamp=1534173299148, value=2
I column=Content:result, timestamp=1534173299148, value=2
capital column=Content:result, timestamp=1534173299148, value=1
is column=Content:result, timestamp=1534173299148, value=1
love column=Content:result, timestamp=1534173299148, value=2
of column=Content:result, timestamp=1534173299148, value=1
the column=Content:result, timestamp=1534173299148, value=1
8 row(s)
Took 0.0196 seconds
网友评论