Flink On Yarn 泛型异常

作者: 大猪大猪 | 来源:发表于2018-11-17 13:50 被阅读1次

Flink 版本 1.8

描述

将Flink任务提交到Yarn的时候发现一个问题

------------------------------------------------------------
 The program finished with the following exception:

The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.
    org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
    com.dounine.scala.flink.App$.main(App.scala:43)

代码

package com.dounine.scala.flink

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import com.dounine.scala.flink.entity.Log
import com.dounine.scala.flink.hbase.CustomTableInputFormat
import com.dounine.scala.flink.utils.HadoopKrb
import com.dounine.scala.flink.utils.HbaseUtil._
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.operators.{DataSource, MapOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.hadoopcompatibility.HadoopInputs
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job

object App {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)


    val conf = HadoopKrb.login()
    conf.set(TableInputFormat.INPUT_TABLE, "logTable")
    conf.set(TableInputFormat.SCAN_ROW_START, "181111000000")
    conf.set(TableInputFormat.SCAN_ROW_STOP, "181111010000")

    val inputFormat = HadoopInputs.createHadoopInput(
      new CustomTableInputFormat,
      classOf[ImmutableBytesWritable],
      classOf[Result],
      Job.getInstance(conf)
    )

    val logDataStream = env.createInput(inputFormat)
      .map(new MapFunction[Tuple2[ImmutableBytesWritable, Result], Log]() {
        @throws[Exception]
        override def map(value: Tuple2[ImmutableBytesWritable, Result]): Log = {
          val v = (qualifier: String) => getValue(value.f1, "ext", qualifier)
          new Log(
            v("time"),
            v("appKey"),
            v("channelCode"),
            v("scene"),
            v("type"),
            v("userId")
          )
        }
      })

    val table = tableEnv.fromDataSet(logDataStream, "appKey,ccode,scene,type,userId,time as tt")

    tableEnv.registerTable("log", table)

    val tt = tableEnv.sqlQuery("select MIN(tt) from log")

//    tableEnv.toDataSet(tt, classOf[Row]).print()

    tableEnv.toDataSet(tt,classOf[Row]).writeAsText(s"""hdfs://storm5/tmp/flink/${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd'T'HH_mm_ss"))}""")
//

    env.execute
  }

}

本地是可以运行的,线上无法提交

后面在官方找到答案 传送门
在Scala中,Flink使用在编译时运行的宏,并在仍然可用时捕获所有泛型类型信息。

解决方案

替换如下两句

import org.apache.flink.streaming.api.scala._
val tupleInfo = createTypeInformation[Tuple2[ImmutableBytesWritable, Result]]

val logDataStream = env.createInput(inputFormat,tupleInfo)

Flink 1.8-SNAPSHOT

顺便提一下,克隆最新的版本1.8-SNAPSHOT是不能直接运行添加的,需要添加相应的依赖包

git clone https://github.com/apache/flink
mvn clean install -DskipTests

下面是可运行的lib目录依赖的包

flink-dist_2.11-1.8-SNAPSHOT.jar                  jersey-common-2.25.1.jar                  jersey-json-1.9.jar
flink-hadoop-compatibility_2.11-1.8-SNAPSHOT.jar  jersey-common-2.27.jar                    jersey-media-jaxb-2.25.1.jar
flink-python_2.11-1.8-SNAPSHOT.jar                jersey-container-servlet-core-2.25.1.jar  jersey-server-1.9.jar
flink-shaded-hadoop2-uber-1.8-SNAPSHOT.jar        jersey-core-1.19.4.jar                    jersey-server-2.25.1.jar
javax.ws.rs-api-2.0.1.jar                         jersey-core-1.9.jar                       log4j-1.2.17.jar
jersey-client-1.9.jar                             jersey-guava-2.25.1.jar                   slf4j-log4j12-1.7.15.jar
jersey-client-2.25.1.jar                          jersey-guice-1.9.jar

依赖包下载传送门

相关文章

网友评论

    本文标题:Flink On Yarn 泛型异常

    本文链接:https://www.haomeiwen.com/subject/hrgbfqtx.html