参考地址:http://hbase.apache.org/boot.html#mapreduce
导入:import org.apache.hadoop.conf.Configured;
导入:import org.apache.hadoop.util.Tool;。
三要素
创建Mapper Class
创建Reducer Class
创建Driver
创建Mapper Class
在map方法中,代码思路步骤如下:
- 获取rowkey:首先我们要得到rowkey来生成Put对象,通过String rowkey = (Bytes.toString(key.get());key是一个形参,类型是ImmutableBytesWritable, 当 集成时读HBase这张表的时候,得到的key就是rowkey。
- 创建一个Put对象:Put put = new Put(key.get());然后用Cell对象来迭代Result对象的value.rawCells()来获取值,使用put.add(cell)组装好Put对象。判断列族是否相同,若相同判断name是否存在,判断age是否存在,最后使用put。
- 最后使用context.write(mapOutPutKey,put )。
class内代码片段
//读取user表中的数据 ImmutableBytesWritable:key Put:一列数据
public static class ReadUserMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable row, Result value,
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(row, resultToPut(row, value));
}
//和命令:put 'user','10001','info:address','shanghai' 相同
private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
put.add(kv);
}
return put;
}
}
创建Reducer Class
- 迭代Iterable<Put>的值
- 使用context.write(null,put)进行写
- class内部代码段如下:
public static class WriteBasicReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable>{
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
for(Put put:values){
context.write(key, put);
}
}
}
创建Driver
- 导入包
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; - 创建一个Job
Job job = Job.getInstance(getConf(), this.getClass().getName());
- 设置Job运行的class
job.setJarByClass(this.getClass());
- 设置Job
- 创建Scan对象
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
Scan表示全表扫描,setCaching方法表示一次抓取多少条数据,而setCacheBlock 方法表示是否设置缓存,mapReduce千万不设置缓存,所以设置为false。
- 设置input和mapper
利用TableMapReduceUtil.initTableMapperJob()方法设置input对象和设置映射关系。
TableMapReduceUtil.initTableMapperJob(
"user", // input HBase table name
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper
Text.class, // mapper output key
Put.class, // mapper output value
job);
- 设置reducer和output,利用TableMapReduceUtil的initTableReducerJob( )方法来设置输出对象和相应的类等信息。
TableMapReduceUtil.initTableReducerJob(
"basic", // output table
WriteBasicReducer.class, // reducer class
job);
+设置Reduce的任务为1个
job.setNumReduceTasks(0);
- 提交job
boolean b = job.waitForCompletion(true);
- run()方法中的代码示例如下:
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job = Job.getInstance(getConf(), this.getClass().getName());
job.setJarByClass(this.getClass());
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
TableMapReduceUtil.initTableMapperJob(
"user", // input HBase table name
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper
Text.class, // mapper output key
Put.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"basic", // output table
WriteBasicReducer.class, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
写main方法
- 得到Configuration对象
Configuration configuration = HBaseConfiguration.create();
- 运行Job
int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
- 程序结束代码
System.exit(status);
- 主函数代码如下:
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
System.exit(status);
}
验证
-
导出jar包
-
导出上传到linux系统目录下
- 然后导出jar包,运行该jar包,上传到FileZilla里面然后输入以下命令,验证mapreduce程序的正确性。
export HBASE_HOME=/opt/sofewares/hbase/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/cdh5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
-
正确后,测试basic表格中的数据,得到以下结果,证明以上操作是正确的。
image.png
网友评论