美文网首页大数据Spark学习笔记
6.Spark学习(Python版本):读写HBase数据库

6.Spark学习(Python版本):读写HBase数据库

作者: 马淑 | 来源:发表于2018-08-11 20:08 被阅读44次
    Step1. 创建一个HBase表
    /usr/local/hadoop目录下启动hadoop:./sbin/start-dfs.sh
    /usr/local/hbase目录下启动hbase:./bin/start-hbase.sh
    /usr/local/hbase目录下启动hbase shell:./bin/hbase shell

    在HBase数据库中,不需要创建数据库,只要直接创建表就可以。我们需要创建的表长下图这个样子:



    create命令中,命令后面首先跟上表名称’student’,然后,再跟上列族名称’info’,这个列族’info’中包含三个列’name’,’gender’,’age’。

    hbase> create 'student','info'
    
    #在实际应用中,一般都是利用编程操作数据
    hbase> put 'student','1','info:name','Xueqian'
    hbase> put 'student','1','info:gender','F'
    hbase> put 'student','1','info:age','23'
    hbase> put 'student','2','info:name','Weiliang'
    hbase> put 'student','2','info:gender','M'
    hbase> put 'student','2','info:age','24'
    
    #查看全部数据
    hbase> scan 'student'
    ROW                   COLUMN+CELL                                               
     1                    column=info:age, timestamp=1533989802237, value=23        
     1                    column=info:gender, timestamp=1533989791899, value=F      
     1                    column=info:name, timestamp=1533989756823, value=Xueqian  
     2                    column=info:age, timestamp=1533989854387, value=24        
     2                    column=info:gender, timestamp=1533989840141, value=M      
     2                    column=info:name, timestamp=1533989825528, value=Weiliang 
    2 row(s) in 0.0350 seconds
    
    
    Step2.配置Spark
    (1)把HBase的lib目录下的一些jar文件需要拷贝到Spark中:

    所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以打开一个终端按照以下命令来操作:

    cd /usr/local/spark/jars
    mkdir hbase
    cd hbase
    cp /usr/local/hbase/lib/hbase*.jar ./
    cp /usr/local/hbase/lib/guava-12.0.1.jar ./
    cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
    cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
    
    (2)在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要我们另行下载:

    spark-examples*.jar下载地址,下载好后将这个jar包放在/usr/local/spark/jars/hbase/

    mkdir -p /usr/local/spark/jars/hbase/
    mv ~/下载/spark-examples* 
    
    (3)设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件:
    cd /usr/local/spark/conf
    vim spark-env.sh
    

    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

    Step3. 编写程序读取HBase数据

    读数据:

    host = 'localhost'
    table = 'student'
    conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
    hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
    count = hbase_rdd.count()
    hbase_rdd.cache()
    hbase_rdd.collect()
     
    1 {"qualifier" : "age", "timestamp" : "1512549772307", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
    {"qualifier" : "gender", "timestamp" : "1512549765192", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
    {"qualifier" : "name", "timestamp" : "1512549757406", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
    2 {"qualifier" : "age", "timestamp" : "1512549829145", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}
    {"qualifier" : "gender", "timestamp" : "1512549790422", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
    {"qualifier" : "name", "timestamp" : "1512549780044", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}               
    

    写数据:

    host = 'localhost'
    table = 'student'
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
     
    rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
    sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
    

    在>hbase中再次查看student数据库看数据已经添加到表里。


    相关文章

      网友评论

        本文标题:6.Spark学习(Python版本):读写HBase数据库

        本文链接:https://www.haomeiwen.com/subject/pvidbftx.html