美文网首页
MLSQL读写Phoenix

MLSQL读写Phoenix

作者: hongshen | 来源:发表于2019-07-23 13:49 被阅读0次

    简介

    apache Phoenix是什么东西,不用多说了,它支持jdbc连接,使用mlsql的jdbc插件就能直接load,像这样:

    connect jdbc where
    url="jdbc:phoenix:192.168.1.102;2181;/hbase"
    and driver="org.apache.phoenix.jdbc.PhoenixDriver"
    and `phoenix.schema.isNamespaceMappingEnabled`="true"
    as p;
    
    load jdbc.`p.TEST.TEST_TABLE` as tb1;
    

    不过需要你将client的jar包放到spark的jars目录。

    但是如果要往里写,我试了下jdbc好像不能直接支持,因为Phoenix都是upsert,如果要支持,可能需要模仿mysql的upsert实现一些自定义的statement,http://docs.mlsql.tech/en/guide/datasource/jdbc.html
    但是我觉得依赖一个driver的jar包是比较麻烦的,后来看了下官网支持spark,因此这里将Phoenix整合进mlsql

    遇到的小问题

    mlsql用的是spark2.4.3,但是Phoenix比较老版本是4.8.0,跟上次搞hive一样,还要处理高版本spark匹配低版本数据源的问题。
    phoenix-spark的官网文档在这里哈http://phoenix.apache.org/phoenix_spark.html,文档中比较有用的信息是phoenix从4.10才提供spark2.0的官网版本,相关的jar包https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark
    我本来想直接用高版本的,经过实践错误提示不能用Phoenix高版本的客户端去连低版本的server
    那就只能重写phoenix-spark模块了。
    在没有版本兼容问题的情况下,只需要依赖2个东西,core包和spark包,那么有了兼容问题,只需要依赖core包,然后自己将spark包写到代码里。

    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-spark</artifactId>
        <version>4.14.0-HBase-1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-core</artifactId>
        <version>4.14.0-HBase-1.2</version>
    </dependency>
    

    具体实现

    自己凭空写没有那个必要,直接找个高于4.10版本的phoenix-spark参考下即可,打开一看,很少的几个类,直接抄过来,改一改即可。
    主要改动是增加phoenix.schema.isNamespaceMappingEnabled配置为true
    改动点:
    1、ConfigurationUtil类的getOutputConfiguration方法返回的那个配置增加:
    config.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true),不然写的时候会报错
    2、读的时候增加这个配置,在PhoenixRelation类中哦

    override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
        val hconf = new Configuration()
        hconf.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
        new PhoenixRDD(
          sqlContext.sparkContext,
          tableName,
          requiredColumns,
          Some(buildFilter(filters)),
          Some(zkUrl),
          hconf,
          dateAsTimestamp
        ).toDataFrame(sqlContext).rdd
      }
    
      // Required by BaseRelation, this will return the full schema for a table
      override def schema: StructType = {
        val hconf = new Configuration()
        hconf.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
        new PhoenixRDD(
          sqlContext.sparkContext,
          tableName,
          Seq(),
          None,
          Some(zkUrl),
          hconf,
          dateAsTimestamp
        ).toDataFrame(sqlContext).schema
      }
    

    其余的没什么要改的了,包名类名都没有变化,这时候,增加一个MLSQLPhoenix类,位置mlsql.core.datasource.impl,参考MLSQLHbase即可,其中dbSplitter我设置为override def dbSplitter: String = "."

      override def fullFormat: String = "org.apache.phoenix.spark"
      override def shortFormat: String = "phoenix"
    

    phoenix要求的配置参数比较少,重点就2个,一个是table,table不要用冒号分隔namespace和表名,要用点,一个是zkUrl,格式参考192.168.1.1,192.168.1.2,192.168.1.3:2181,如果znode默认不是/hbase,应该可以这样写格式参考192.168.3.122,192.168.3.123,192.168.3.124:2181/znode,没试过,应该是跟hbase用户一样的

    用法

    由于mlsql设计理念忒屌,忒强大,用起来简单的一笔
    你可以这样加载:

    connect phoenix where
    zkUrl="192.168.1.1,192.168.1.2,192.168.1.3:2181"
    and namespace="testNamespace"
    as p;
    load phoenix.`p.testName` as t1;
    

    你可以这样写,比如我创建了一张Phoenix的表,三个字段,ID,NAME,AGE,其中前两个是varchar,AGE为integer

    set data='''
    {"ID":"hello1","NAME":"name1","AGE":"16"}
    {"ID":"hello2","NAME":"name2","AGE":"17"}
    {"ID":"hello3","NAME":"name3","AGE":"18"}
    ''';
    load jsonStr.`data` as t1;
    connect phoenix where
    zkUrl="192.168.1.1,192.168.1.2,192.168.1.3:2181"
    and namespace="testNamespace"
    as p;
    select ID,NAME,cast(AGE AS int) AS AGE from t1 as t2;
    
    save overwrite t2 as phoenix.`p.testName`;
    

    注意点:写Phoenix不支持直接将varchar转成数值类型,所以要先cast,还有就是save时,只支持save overwrite,注意哦
    是不是很简单。

    相关文章

      网友评论

          本文标题:MLSQL读写Phoenix

          本文链接:https://www.haomeiwen.com/subject/ifpolctx.html