美文网首页大数据精进之路我爱编程
hbase observer实现value自动累加求和

hbase observer实现value自动累加求和

作者: CarsonCao | 来源:发表于2017-04-28 18:45 被阅读601次

    hbase有两种Coprocessor,endpoint和observer,endpoint类似于存储过程,可以在hbase上实现了一个类似于mapReduce的过程,observer实现起来比较简单,类似于触发器,具体架构和理论在这里就不在多说,直接上代码,下面代码实现了一个数据自增的功能,相同key的数据,每插入一条,后面的counts列自动加1。

    package com.open01.hbase.coprocessor;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * Created by caolch on 2017/4/7.
     */
    public class PrePutSumObserver extends BaseRegionObserver{
    
        private byte[] family;
        private byte[] col;
    
        @Override
        public void start(CoprocessorEnvironment e) throws IOException {
            Configuration conf = e.getConfiguration();
            family = Bytes.toBytes(conf.get("family"));
            col = Bytes.toBytes(conf.get("col"));
        }
    
        @Override
        public void stop(CoprocessorEnvironment e) throws IOException {
            super.stop(e);
        }
    
        @Override
        public void prePut(ObserverContext<RegionCoprocessorEnvironment> e,
                           Put put, WALEdit edit, Durability durability) throws IOException {
    //        super.prePut(e, put, edit, durability);
            if (put.has(family,col)){
                int oriCounts = 0;
                int incrCounts = 0;
                int sum = 0;
                List<Cell> cells = put.get(family,col);
    
                Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
                for (Cell cell : rs.rawCells()) {
                    if (CellUtil.matchingColumn(cell,family,col)){
                        oriCounts = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
                    }
                }
                for (Cell cell : cells) {
                    if (CellUtil.matchingColumn(cell,family,col)){
                        incrCounts = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
                    }
                }
                sum = oriCounts + incrCounts;
                put.addColumn(family,col,Bytes.toBytes(String.valueOf(sum)));
            }
        }
    }
    
    

    pom.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>hbase-observer</groupId>
        <artifactId>hbase-observer</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.1.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.1.9</version>
        </dependency>
        </dependencies>
    </project>
    

    打好包,把jar包传到上传到hdfs目录,然后在hbase执行alter命令,修改要添加observer的table:

    #!/bin/bash
    
    hbase shell <<EOF
    disable 'observer_test'
    alter 'observer_test', METHOD => 'table_att', 'Coprocessor'=>'hdfs://open009:9000/user/root/observerJar/hbase-observer-1.0-SNAPSHOT.jar|com.open01.hbase.coprocessor.PrePutSumObserver|1001|family=f1,col=counts'
    enable 'observer_test'
    EOF
    
    

    相关文章

      网友评论

        本文标题:hbase observer实现value自动累加求和

        本文链接:https://www.haomeiwen.com/subject/qejazttx.html