前言
在大数据开发中,少不了有对各种数据库的读写。经常会出现一些字段类型与数据库期望不一致报错。Spark 默认的方言对象都存放在org.apache.spark.sql.jdbc 包下。Spark 通过这些方言配置Java Bean ,scala BeanType 与对应数据库字段类型的关系。
问题产生背景
由于老业务系统使用的是SqlServer,在将Boolean类型数据回写数据时产生 异常(bit 类型 不允许设置 宽度)。
问题分析
很明显的类型设置错误,把 scala boolean 类型翻译成SqlServer 成 bit 并且为他设置长度,所以导致报错。
查看源码发现spark 并没有特殊处理 Boolean 这种数据类型(至少在 2.1.0 版本如此)。 源码如下
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import org.apache.spark.sql.types._
private object MsSqlServerDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (typeName.contains("datetimeoffset")) {
// String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
Option(StringType)
} else {
None
}
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case _ => None
}
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
解决思路
查看源码发现spark 对时间类型做了特殊处理,直接参考现成的例子创建一个类。并重写getJDBCType
由于MsSqlServerDialect 是私有的,所以我们只能继承JdbcDialect。具体实现类如下
package cn.harsons.bd.common
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
import org.apache.spark.sql.types._
/**
*
* @author liyabin
* @date 2020/3/24 0024
*/
object SQLServerJdbcDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (typeName.contains("datetimeoffset")) {
// String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
Option(StringType)
} else {
None
}
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case BooleanType => Some(JdbcType("tinyint",java.sql.Types.BOOLEAN))
case _ => None
}
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
上面只是把方言对象定义好了,如果让方言生效?
JdbcDialects.registerDialect(SQLServerJdbcDialect) 可以直接让方言生效,大家也不必担心spark会不会读默认的MsSqlServerDialect 毕竟他们canHandle 条件都是一样的,一般情况下都是配置大于约定。Spark会优先加载自定义的。
网友评论