美文网首页
HBase从入门到精通9:基于HBase的MapReduce实战

HBase从入门到精通9:基于HBase的MapReduce实战

作者: 金字塔下的小蜗牛 | 来源:发表于2020-04-02 09:49 被阅读0次

本节介绍一下如何使用HBase上的数据源开发一个MapReduce程序:这里以WordCount程序为例。

0.搭建本地开发环境

(0)将HBase的主机名加入本地Hosts

编辑C:\Windows\System32\drivers\etc\hosts文件,加入下面的内容:

192.168.126.110 bigdata

(1)下载依赖的HBase Jar包

使用WinSCP工具下载$HBASE_HOME/lib目录下的所有Jar包至本地目录如E:/hbaselibs中。

(2)下载依赖的MapReduce Jar包

使用WinSCP将$HADOOP_HOME/share/hadoop/common/lib和$HADOOP_HOME/share/hadoop/mapreduce/lib目录下的所有Jar包下载至本地目录如E:/mapreducelibs中。

(3)新建HBaseWordCount工程

打开Eclipse IDE,依次选择”File”->”New”->”Java Project”,工程名字填写HBaseWordCount,”Finsh”。

(4)给工程添加依赖包

在HBaseWordCount工程上右键单击,依次选择”New”->”Folder”,文件夹名字填写”libs”,”Finish”。将E:/hbaselibs和E:/mapreducelibs目录中的所有Jar包复制、粘贴到工程下面的libs文件夹中。展开libs文件夹,选中所有Jar包,右键,依次选择”Build Path”->”Add to Build Path”即可。

(5)创建Demo Package

在HBaseWordCount工程下面的src文件上右键,依次选择”New”->”Package”,Package名字填写”Demo”,”Finish”。

(6)新建三个Java Class

右键Demo,依次选择”New”->”Class”,类名填写”WordCountMapper”,”Finish”。
右键Demo,依次选择”New”->”Class”,类名填写”WordCountReducer”,”Finish”。
右键Demo,依次选择”New”->”Class”,类名填写”WordCountMain”,”Finish”。

这三个类的代码如下所示。

1.编写Mapper程序

HBase WordCount的Mapper程序WordCountMapper.java的内容如下:

package Demo; 
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

//TableMapper<K2, V2>:输入的是表中的记录,没有<K1,V1>
public class WordCountMapper extends TableMapper<Text, LongWritable> {
    @Override
    //输入的就是表中的一条记录
    protected void map(ImmutableBytesWritable key, //key : 记录的rowkey
                       Result value,               //value: 输入的记录
                   Context context)            //MapReduce上下文
        throws IOException, InterruptedException {
    //取出指定列下的数据
    String str = Bytes.toString(value.getValue(Bytes.toBytes("Content"), Bytes.toBytes("line")));
    //按空格分词
    String[] words = str.split(" ");
    //输出(word, 1)
    for(String w: words){
        context.write(new Text(w), new LongWritable(1));
    }
    }
}

2.编写Reducer程序

HBase WordCount的Reducer程序WordCountReducer.java的内容如下:

package Demo;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

//TableReducer<K3,V3,Reduce的输出:一条记录>
public class WordCountReducer extends TableReducer<Text, LongWritable, ImmutableBytesWritable> {
    @Override
    //输入的就是表中的一条记录
    protected void reduce(Text k3, Iterable<LongWritable> v3, Context context)
        throws IOException, InterruptedException {
    // 求和
    long total = 0;
    for(LongWritable l:v3){
        total = total + l.get();
    }
    //构造Put对象: 使用单词k3作为rowkey
    Put put = new Put(Bytes.toBytes(k3.toString()));
    //加载数据
    put.addColumn(Bytes.toBytes("Content"), Bytes.toBytes("result"), Bytes.toBytes(String.valueOf(total)));
    //输出到HBase
    context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())),  //插入数据的时候,rowkey是多少
              put);//数据
    }
}

3.编写主程序

HBase WordCount的主程序WordCountMain.java的内容如下:

package Demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class WordCountMain {
    public static void main(String[] args) throws Exception {
        //创建配置信息
    Configuration conf = new Configuration();
    //指定ZooKeeper的地址
    conf.set("hbase.zookeeper.quorum", "192.168.126.110");
    //创建Job
    Job job = Job.getInstance(conf);
    //设置主类
    job.setJarByClass(WordCountMain.class);
    //定义扫描器
    Scan scan = new Scan();
    //指定要读取的列
    scan.addColumn(Bytes.toBytes("Content"), Bytes.toBytes("line"));
    //指定任务的Mapper
    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("tblArticle"),  //输入表
                                      scan,                         //扫描器
                                      WordCountMapper.class,        //Mapper类
                                      Text.class,                   //K2类型
                                      LongWritable.class,           //V2类型
                                      job );                        //Job
    //指定任务的Reducer
    TableMapReduceUtil.initTableReducerJob("tblResult",                 //输出表
                                       WordCountReducer.class,      //Reducer类
                                       job);                        //Job
    //执行Job
    job.waitForCompletion(true);
    }
}

4.将程序打包并上传

(1)在Demo上右键,依次选择Export,JAR file,Next,选择到处位置:E:\HBaseWordCount.jar,Next,Next,指定Main Class:Demo.WordCountMain,Finish。

(2)使用WinSCP将E:\HBaseWordCount.jar上传至服务器bigdata的/root/input目录下。

5.准备HBase上的数据源

(1)创建输入表tblArticle

[root@master ~]# hbase shell
hbase(main):001:0> create 'tblArticle','Content'
hbase(main):002:0> put 'tblArticle','1','Content':line','I love Beijing'
hbase(main):003:0> put 'tblArticle','2','Content':line','I love China'
hbase(main):004:0> put 'tblArticle','3','Content':line','Beijing is the capital of China'
hbase(main):005:0> scan 'tblArticle'
ROW COLUMN+CELL
1 column=Content:line, timestamp=1534172325722, value=I love Beijing
2 column=Content:line, timestamp=1534172332586, value=I love China
3 column=Content:line, timestamp=1534172338304, value=Beijing is the capital of China

(2)创建输出表tblResult

hbase(main):006:0> create 'tblResult','Content'
hbase(main):007:0> scan 'tblResult'
ROW COLUMN+CELL
0 row(s)
Took 0.5658 seconds

6.执行HBase WordCount程序

进入/root/input目录下,执行HBaseWordCount.jar程序:

[root@bigdata ~]# cd /root/input
[root@bigdata input]# hadoop jar HBaseWordCount.jar

查看结果:

hbase(main):011:0> scan 'tblResult'
ROW COLUMN+CELL
Beijing column=Content:result, timestamp=1534173299148, value=2
China column=Content:result, timestamp=1534173299148, value=2
I column=Content:result, timestamp=1534173299148, value=2
capital column=Content:result, timestamp=1534173299148, value=1
is column=Content:result, timestamp=1534173299148, value=1
love column=Content:result, timestamp=1534173299148, value=2
of column=Content:result, timestamp=1534173299148, value=1
the column=Content:result, timestamp=1534173299148, value=1
8 row(s)
Took 0.0196 seconds

相关文章

网友评论

      本文标题:HBase从入门到精通9:基于HBase的MapReduce实战

      本文链接:https://www.haomeiwen.com/subject/ssocdhtx.html