Step1. 创建一个HBase表
/usr/local/hadoop目录下启动hadoop:./sbin/start-dfs.sh
/usr/local/hbase目录下启动hbase:./bin/start-hbase.sh
/usr/local/hbase目录下启动hbase shell:./bin/hbase shell
在HBase数据库中,不需要创建数据库,只要直接创建表就可以。我们需要创建的表长下图这个样子:
create命令中,命令后面首先跟上表名称’student’,然后,再跟上列族名称’info’,这个列族’info’中包含三个列’name’,’gender’,’age’。
hbase> create 'student','info'
#在实际应用中,一般都是利用编程操作数据
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
#查看全部数据
hbase> scan 'student'
ROW COLUMN+CELL
1 column=info:age, timestamp=1533989802237, value=23
1 column=info:gender, timestamp=1533989791899, value=F
1 column=info:name, timestamp=1533989756823, value=Xueqian
2 column=info:age, timestamp=1533989854387, value=24
2 column=info:gender, timestamp=1533989840141, value=M
2 column=info:name, timestamp=1533989825528, value=Weiliang
2 row(s) in 0.0350 seconds
Step2.配置Spark
(1)把HBase的lib目录下的一些jar文件需要拷贝到Spark中:
所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以打开一个终端按照以下命令来操作:
cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
(2)在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要我们另行下载:
spark-examples*.jar下载地址,下载好后将这个jar包放在/usr/local/spark/jars/hbase/
mkdir -p /usr/local/spark/jars/hbase/
mv ~/下载/spark-examples*
(3)设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件:
cd /usr/local/spark/conf
vim spark-env.sh
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
Step3. 编写程序读取HBase数据
读数据:
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
hbase_rdd.collect()
1 {"qualifier" : "age", "timestamp" : "1512549772307", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
{"qualifier" : "gender", "timestamp" : "1512549765192", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
{"qualifier" : "name", "timestamp" : "1512549757406", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
2 {"qualifier" : "age", "timestamp" : "1512549829145", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}
{"qualifier" : "gender", "timestamp" : "1512549790422", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
{"qualifier" : "name", "timestamp" : "1512549780044", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}
写数据:
host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
在>hbase中再次查看student数据库看数据已经添加到表里。
网友评论