美文网首页
Spark SQL 访问Hbase

Spark SQL 访问Hbase

作者: 阿亚2011 | 来源:发表于2019-07-01 18:00 被阅读0次

    @[toc]
    参考文档 : https://hbase.apache.org/book.html#_sparksql_dataframes

    简介

    hbase-spark integration使用了Spark-1.2.0中引入的DataSource API (SPARK-3247), 它在简单的HBase KV存储和复杂的关系SQL查询之间架起桥梁,使用户能够使用Spark在HBase上执行复杂的数据分析工作。HBase数据帧是一个标准的Spark数据帧,能够与Hive、ORC、Parquet、JSON等任何其他数据源交互。HBase Spark集成应用了诸如分区修剪、列修剪、谓词下推和数据位置等关键技术。
    要使用hbase-spark integration connector,用户需要为HBase和Spark表之间的模式映射定义Catalog,准备数据并填充HBase表,然后加载HBase数据帧。之后,用户可以使用SQL查询来集成查询和访问HBase表中的记录。

    打包生成hbase-spark库

    使用hbase-spark integration需要hbase-spark库
    找了半天没有找到最新的那个包, 所以自己去github上面下载代码打包, 然后安装到本地仓库

    git clone https://github.com/apache/hbase-connectors.git
    cd hbase-connectors/spark/hbase-spark
    mvn -Dspark.version=2.4.3 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
    

    然后在项目pom.xml中添加依赖

            <dependency>
                <groupId>org.apache.hbase.connectors.spark</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>1.0.1</version>
            </dependency>
           <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.1.4</version>
            </dependency>
    

    解决访问Hbase问题

    执行代码时出现错误:

    Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
        at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
    Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/fs/HFileSystem
        at cn.com.sjfx.sparkappdemo.Application.main(Application.java:27)
        ... 6 more
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.fs.HFileSystem
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more
    

    这是因为spark无法访问hbase中的库造成的, 需要在制作镜像的时候把hbase的库加入到spark中,
    修改Dockerfile, 增加如下内容:

    COPY /hbase-lib/* /spark/jars/
    

    读写Hbase

    public class Application {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("demo");
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            SparkSession sparkSession = SparkSession.builder()
                    .sparkContext(jsc.sc())
                    .getOrCreate();
    
            //设置要访问的hbase的zookeeper
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "192.168.1.22:15301,192.168.1.22:15302,192.168.1.22:15303");
            //一定要创建这个hbaseContext, 因为后面写入时会用到它
            HBaseContext hBaseContext=new HBaseContext(jsc.sc(),configuration,null);
    
            //创建一个测试用的RDD
            List<Integer> data = new ArrayList<>();
            for (int i = 0; i < 256; i++) {
                data.add(i);
            }
            JavaRDD<Integer> rdd = jsc.parallelize(data);
            JavaRDD<HBaseRecord> rdd1 = rdd.map(i -> new HBaseRecord(i, "extra"));
            rdd1.collect().forEach(System.out::println);
            //根据RDD创建数据帧
            Dataset<Row> df = sparkSession.createDataFrame(rdd1, HBaseRecord.class);
    
            //定义映射的catalog
            String catalog = "{" +
                    "       \"table\":{\"namespace\":\"default\", \"name\":\"table1\"}," +
                    "       \"rowkey\":\"key\"," +
                    "       \"columns\":{" +
                    "         \"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "         \"col1\":{\"cf\":\"cf1\", \"col\":\"col1\", \"type\":\"boolean\"}," +
                    "         \"col2\":{\"cf\":\"cf2\", \"col\":\"col2\", \"type\":\"double\"}," +
                    "         \"col3\":{\"cf\":\"cf3\", \"col\":\"col3\", \"type\":\"float\"}," +
                    "         \"col4\":{\"cf\":\"cf4\", \"col\":\"col4\", \"type\":\"int\"}," +
                    "         \"col5\":{\"cf\":\"cf5\", \"col\":\"col5\", \"type\":\"bigint\"}," +
                    "         \"col6\":{\"cf\":\"cf6\", \"col\":\"col6\", \"type\":\"smallint\"}," +
                    "         \"col7\":{\"cf\":\"cf7\", \"col\":\"col7\", \"type\":\"string\"}," +
                    "         \"col8\":{\"cf\":\"cf8\", \"col\":\"col8\", \"type\":\"tinyint\"}" +
                    "       }" +
                    "     }";
            //写入数据
            df.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .option(HBaseTableCatalog.tableCatalog(), catalog)
                    .option(HBaseTableCatalog.newTable(), "5")  //写入到5个分区
                    .mode(SaveMode.Overwrite)  // 覆盖模式
                    .save();
            //读取数据
            Dataset<Row> df2 = sparkSession.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .option(HBaseTableCatalog.tableCatalog(), catalog)
                    .load();
            System.out.println("read result: ");
            df2.show();
        }
    
        //类需要可序列化
        public static class HBaseRecord implements Serializable {
            private static final long serialVersionUID = 4331526295356820188L;
            //属性一定要getter/setter, 即使是public
            public String col0;
            public Boolean col1;
            public Double col2;
            public Float col3;
            public Integer col4;
            public Long col5;
            public Short col6;
            public String col7;
            public Byte col8;
    
            public String getCol0() {
                return col0;
            }
    
            public void setCol0(String col0) {
                this.col0 = col0;
            }
    
            public Boolean getCol1() {
                return col1;
            }
    
            public void setCol1(Boolean col1) {
                this.col1 = col1;
            }
    
            public Double getCol2() {
                return col2;
            }
    
            public void setCol2(Double col2) {
                this.col2 = col2;
            }
    
            public Float getCol3() {
                return col3;
            }
    
            public void setCol3(Float col3) {
                this.col3 = col3;
            }
    
            public Integer getCol4() {
                return col4;
            }
    
            public void setCol4(Integer col4) {
                this.col4 = col4;
            }
    
            public Long getCol5() {
                return col5;
            }
    
            public void setCol5(Long col5) {
                this.col5 = col5;
            }
    
            public Short getCol6() {
                return col6;
            }
    
            public void setCol6(Short col6) {
                this.col6 = col6;
            }
    
            public String getCol7() {
                return col7;
            }
    
            public void setCol7(String col7) {
                this.col7 = col7;
            }
    
            public Byte getCol8() {
                return col8;
            }
    
            public void setCol8(Byte col8) {
                this.col8 = col8;
            }
    
            public HBaseRecord(Integer i, String s) {
                col0 = String.format("row%03d", i);
                col1 = i % 2 == 0;
                col2 = Double.valueOf(i);
                col3 = Float.valueOf(i);
                col4 = i;
                col5 = Long.valueOf(i);
                col6 = i.shortValue();
                col7 = "String:" + s;
                col8 = i.byteValue();
            }
    
            @Override
            public String toString() {
                return "HBaseRecord{" +
                        "col0='" + col0 + '\'' +
                        ", col1=" + col1 +
                        ", col2=" + col2 +
                        ", col3=" + col3 +
                        ", col4=" + col4 +
                        ", col5=" + col5 +
                        ", col6=" + col6 +
                        ", col7='" + col7 + '\'' +
                        ", col8=" + col8 +
                        '}';
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Spark SQL 访问Hbase

          本文链接:https://www.haomeiwen.com/subject/xyunqctx.html