美文网首页我爱编程
大数据之HBase MapReduce的实例分析

大数据之HBase MapReduce的实例分析

作者: 栀子花_ef39 | 来源:发表于2018-07-31 15:43 被阅读121次

    跟Hadoop的无缝集成使得使用MapReduce对HBase的数据进行分布式计算非常方便,本文将以前面的blog示例,介绍HBase下MapReduce开发要点。很好理解本文前提是你对Hadoop MapReduce有一定的了解。

    HBase MapReduce核心类介绍

    首先一起来回顾下MapReduce的基本编程模型,

    可以看到最基本的是通过Mapper和Reducer来处理KV对,Mapper的输出经Shuffle及Sort后变为Reducer的输入。除了Mapper和Reducer外,另外两个重要的概念是InputFormat和OutputFormat,定义了Map-Reduce的输入和输出相关的东西。HBase通过对这些类的扩展(继承)来方便MapReduce任务来读写HTable中的数据。

    实例分析

    我们还是以最初的blog例子来进行示例分析,业务需求是这样:找到具有相同兴趣的人,我们简单定义为如果author之间article的tag相同,则认为两者有相同兴趣,将分析结果保存到HBase。除了上面介绍的blog表外,我们新增一张表tag_friend,RowKey为tag,Value为authors,大概就下面这样。

    我们省略了一些跟分析无关的Column数据,上面的数据按前面描述的业务需求经过MapReduce分析,应该得到下面的结果

    实际的运算过程分析如下

    代码实现

    有了上面的分析,代码实现就比较简单了。只需以下几步

    定义Mapper类继承TableMapper,map的输入输出KV跟上面的分析一致。public static class Mapper extends TableMapper {

    public Mapper() {}

    @Override

    public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException {

    ImmutableBytesWritable value = null;

    String[] tags = null;

    for (KeyValue kv : values.list()) {

    if ("author".equals(Bytes.toString(kv.getFamily()))

    && "nickname".equals(Bytes.toString(kv.getQualifier()))) {

    value = new ImmutableBytesWritable(kv.getValue());

    }

    if ("article".equals(Bytes.toString(kv.getFamily()))

    && "tags".equals(Bytes.toString(kv.getQualifier()))) {

    tags = Bytes.toString(kv.getValue()).split(",");

    }

    }

    for (int i = 0; i < tags.length; i++) {

    ImmutableBytesWritable key = new ImmutableBytesWritable(

    Bytes.toBytes(tags[i].toLowerCase()));

    try {

    context.write(key,value);

    } catch (InterruptedException e) {

    throw new IOException(e);

    }

    }

    }

    }

    复制代码

    定义Reducer类继承TableReducer,reduce的输入输出KV跟上面分析的一致。public static class Reducer extends TableReducer {

    @Override

    public void reduce(ImmutableBytesWritable key,Iterable values,

    Context context) throws IOException, InterruptedException {

    String friends="";

    for (ImmutableBytesWritable val : values) {

    friends += (friends.length()>0?",":"")+Bytes.toString(val.get());

    }

    Put put = new Put(key.get());

    put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),

    Bytes.toBytes(friends));

    context.write(key, put);

    }

    }

    复制代码

    在提交作业时设置inputFormat为TableInputFormat,设置outputFormat为TableOutputFormat,可以借助TableMapReduceUtil类来简化编码。public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    conf = HBaseConfiguration.create(conf);

    Job job = new Job(conf, "HBase_FindFriend");

    job.setJarByClass(FindFriend.class);

    Scan scan = new Scan();

    scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));

    scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tags"));

    TableMapReduceUtil.initTableMapperJob("blog", scan,FindFriend.Mapper.class,

    ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);

    TableMapReduceUtil.initTableReducerJob("tag_friend",FindFriend.Reducer.class, job);

    System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    复制代码

    对大数据感兴趣的朋友可以加我的群  615997810  一起交流学习,还有免费资料可以领取 1,_推荐系统理论与实战项目 Part2

    2,推荐系统理论与实战 项目Part1

    3.实时交易监控系统项目(下)

    4,实时交易监控系统项目(上)

    5,用户行为分析系统项目

    6,分布式全文搜索引擎ElasticSearch Part2

    7,大数据批处理之HIVE详解

    8,ES公开课 part1

    9,spark_streaming_

    10,数据仓库搭建详解

    11,大数据任务调度

    12,流数据集成神器Kafka

    13,Spark 公开课

    14,海量日志收集利器:Flume

    15,Impala简介

    16,Hive简介

    17,MapReduce简介

    18海量数据高速存取数据库 HBase

    19,浅谈Hadoop管理器yarn原理

    小结

    本文通过实例分析演示了使用MapReduce分析HBase的数据,需要注意的这只是一种常规的方式(分析表中的数据存到另外的表中),实际上不局限于此,不过其他方式跟此类似。如果你进行到这里,你肯定想要马上运行它看看结果,希望大家多多关注哦。

    相关文章

      网友评论

        本文标题:大数据之HBase MapReduce的实例分析

        本文链接:https://www.haomeiwen.com/subject/ezrhvftx.html