简介
apache Phoenix是什么东西,不用多说了,它支持jdbc连接,使用mlsql的jdbc插件就能直接load,像这样:
connect jdbc where
url="jdbc:phoenix:192.168.1.102;2181;/hbase"
and driver="org.apache.phoenix.jdbc.PhoenixDriver"
and `phoenix.schema.isNamespaceMappingEnabled`="true"
as p;
load jdbc.`p.TEST.TEST_TABLE` as tb1;
不过需要你将client的jar包放到spark的jars目录。
但是如果要往里写,我试了下jdbc好像不能直接支持,因为Phoenix都是upsert,如果要支持,可能需要模仿mysql的upsert实现一些自定义的statement,http://docs.mlsql.tech/en/guide/datasource/jdbc.html
但是我觉得依赖一个driver的jar包是比较麻烦的,后来看了下官网支持spark,因此这里将Phoenix整合进mlsql
遇到的小问题
mlsql用的是spark2.4.3,但是Phoenix比较老版本是4.8.0,跟上次搞hive一样,还要处理高版本spark匹配低版本数据源的问题。
phoenix-spark的官网文档在这里哈http://phoenix.apache.org/phoenix_spark.html,文档中比较有用的信息是phoenix从4.10才提供spark2.0的官网版本,相关的jar包https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark
我本来想直接用高版本的,经过实践错误提示不能用Phoenix高版本的客户端去连低版本的server
那就只能重写phoenix-spark模块了。
在没有版本兼容问题的情况下,只需要依赖2个东西,core包和spark包,那么有了兼容问题,只需要依赖core包,然后自己将spark包写到代码里。
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.0-HBase-1.2</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.0-HBase-1.2</version>
</dependency>
具体实现
自己凭空写没有那个必要,直接找个高于4.10版本的phoenix-spark参考下即可,打开一看,很少的几个类,直接抄过来,改一改即可。
主要改动是增加phoenix.schema.isNamespaceMappingEnabled配置为true
改动点:
1、ConfigurationUtil类的getOutputConfiguration方法返回的那个配置增加:
config.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
,不然写的时候会报错
2、读的时候增加这个配置,在PhoenixRelation类中哦
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val hconf = new Configuration()
hconf.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
new PhoenixRDD(
sqlContext.sparkContext,
tableName,
requiredColumns,
Some(buildFilter(filters)),
Some(zkUrl),
hconf,
dateAsTimestamp
).toDataFrame(sqlContext).rdd
}
// Required by BaseRelation, this will return the full schema for a table
override def schema: StructType = {
val hconf = new Configuration()
hconf.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
new PhoenixRDD(
sqlContext.sparkContext,
tableName,
Seq(),
None,
Some(zkUrl),
hconf,
dateAsTimestamp
).toDataFrame(sqlContext).schema
}
其余的没什么要改的了,包名类名都没有变化,这时候,增加一个MLSQLPhoenix类,位置mlsql.core.datasource.impl
,参考MLSQLHbase即可,其中dbSplitter
我设置为override def dbSplitter: String = "."
override def fullFormat: String = "org.apache.phoenix.spark"
override def shortFormat: String = "phoenix"
phoenix要求的配置参数比较少,重点就2个,一个是table,table不要用冒号分隔namespace和表名,要用点,一个是zkUrl,格式参考192.168.1.1,192.168.1.2,192.168.1.3:2181
,如果znode默认不是/hbase,应该可以这样写格式参考192.168.3.122,192.168.3.123,192.168.3.124:2181/znode
,没试过,应该是跟hbase用户一样的
用法
由于mlsql设计理念忒屌,忒强大,用起来简单的一笔
你可以这样加载:
connect phoenix where
zkUrl="192.168.1.1,192.168.1.2,192.168.1.3:2181"
and namespace="testNamespace"
as p;
load phoenix.`p.testName` as t1;
你可以这样写,比如我创建了一张Phoenix的表,三个字段,ID,NAME,AGE,其中前两个是varchar,AGE为integer
set data='''
{"ID":"hello1","NAME":"name1","AGE":"16"}
{"ID":"hello2","NAME":"name2","AGE":"17"}
{"ID":"hello3","NAME":"name3","AGE":"18"}
''';
load jsonStr.`data` as t1;
connect phoenix where
zkUrl="192.168.1.1,192.168.1.2,192.168.1.3:2181"
and namespace="testNamespace"
as p;
select ID,NAME,cast(AGE AS int) AS AGE from t1 as t2;
save overwrite t2 as phoenix.`p.testName`;
注意点:写Phoenix不支持直接将varchar转成数值类型,所以要先cast,还有就是save时,只支持save overwrite,注意哦
是不是很简单。
网友评论