美文网首页数据架构
【Clickhouse】Spark通过ClickHouse-Na

【Clickhouse】Spark通过ClickHouse-Na

作者: PowerMe | 来源:发表于2020-04-30 11:15 被阅读0次

    目前通过JDBC写Clickhouse有两种插件可以用
    官方的JDBC:8123端口
    基于HTTP实现的,整体性能不太出色,有可能出现超时的现象
    housepower的ClickHouse-Native-JDBC:9000端口
    基于TCP协议实现,支持高性能写入,数据按列组织并有压缩

    记录下使用ClickHouse-Native-JDBC的过程:
    Spark版本:2.1.0
    Clickhouse版本:20.2.1.2183,单点部署
    ClickHouse-Native-JDBC版本:2.1-stable

    1. 首先在Clickhouse创建一张本地表:
    CREATE TABLE IF NOT EXISTS jdbc_test_table \
    ( \
        `name` String, \
        `age` UInt8, \
        `job` String \
    ) \
    ENGINE = MergeTree() \
    PARTITION BY age \
    ORDER BY age \
    SETTINGS index_granularity = 8192
    
    1. 编写Spark代码:
    ## pom要加入JDBC的依赖
    <dependency>
       <groupId>com.github.housepower</groupId>
       <artifactId>clickhouse-native-jdbc</artifactId>
        <version>2.1-stable</version>
     </dependency>
    
    import java.sql.SQLFeatureNotSupportedException
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkJDBCToClickhouse {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf =
          new SparkConf()
          .setAppName("SparkJDBCToClickhouse")
          .setMaster("local[1]")
    
        val spark: SparkSession =
          SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
        
        val filePath = "people.csv"
        val ckDriver = "com.github.housepower.jdbc.ClickHouseDriver"
        val ckUrl = "jdbc:clickhouse://localhost:9000"
        val table = "jdbc_test_table"
        // 读取people.csv测试文件内容
        val peopleDFCsv = 
          spark.read
            .format("csv")
            .option("sep", ";")
            .option("inferSchema", "true")
            .option("header", "true")
            .load(filePath)
        peopleDFCsv.show()
    
        try {
          val pro = new java.util.Properties
          pro.put("driver", ckDriver)
          peopleDFCsv.write
            .mode(SaveMode.Append)
            .option("batchsize", "20000")
            .option("isolationLevel", "NONE")
            .option("numPartitions", "1")
            .jdbc(ckUrl, table, pro)
        } catch {
          // 这里注意下,spark里面JDBC datasource用到的一些获取元数据的方法插件里并没有支持,比如getPrecision & setQueryTimeout等等,都会抛出异常,但是并不影响写入
          case e: SQLFeatureNotSupportedException =>
            println("catch and ignore!")
        }
        spark.close()
      }
    }
    

    在上面抛异常的地方卡了很久,Spark在JDBC读取的时候,会先尝试获取目标表的schema元数据,调用一些方法,比如resolvedTable时会getSchema:

    def getSchema(
          resultSet: ResultSet,
          dialect: JdbcDialect,
          alwaysNullable: Boolean = false): StructType = {
        val rsmd = resultSet.getMetaData
        val ncols = rsmd.getColumnCount
        val fields = new Array[StructField](ncols)
        var i = 0
        while (i < ncols) {
         // 以下方法可能不兼容
          val columnName = rsmd.getColumnLabel(i + 1)
          val dataType = rsmd.getColumnType(i + 1)
          val typeName = rsmd.getColumnTypeName(i + 1)
          val fieldSize = rsmd.getPrecision(i + 1)
          val fieldScale = rsmd.getScale(i + 1)
          val isSigned = {
            try {
              rsmd.isSigned(i + 1)
            } catch {
              // Workaround for HIVE-14684:
              case e: SQLException if
              e.getMessage == "Method not supported" &&
                rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
            }
          }
          val nullable = if (alwaysNullable) {
            true
          } else {
            rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
          }
          val metadata = new MetadataBuilder().putLong("scale", fieldScale)
          val columnType =
            dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
              getCatalystType(dataType, fieldSize, fieldScale, isSigned))
          fields(i) = StructField(columnName, columnType, nullable)
          i = i + 1
        }
        new StructType(fields)
      }
    

    调用的这些方法里,有一些是ClickHouse-Native-JDBC不支持的,如:

       @Override
        public int getPrecision(int column) throws SQLException {
            throw new SQLFeatureNotSupportedException();
        }
    
        @Override
        public int getScale(int column) throws SQLException {
            throw new SQLFeatureNotSupportedException();
        }
    

    所有抛异常的方法在这里

    one more thing
    若使用spark-2.4.5版本无法写入,由于插件不支持setQueryTimeout接口,直接在JdbcRelation阶段退出:

    java.sql.SQLFeatureNotSupportedException
      at com.github.housepower.jdbc.wrapper.SQLStatement.setQueryTimeout(SQLStatement.java:59)
      at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:862)
      at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
      at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    

    相关文章

      网友评论

        本文标题:【Clickhouse】Spark通过ClickHouse-Na

        本文链接:https://www.haomeiwen.com/subject/ccfzwhtx.html