Flink 版本 1.8
描述
将Flink任务提交到Yarn的时候发现一个问题
------------------------------------------------------------
The program finished with the following exception:
The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.
org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
com.dounine.scala.flink.App$.main(App.scala:43)
代码
package com.dounine.scala.flink
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.dounine.scala.flink.entity.Log
import com.dounine.scala.flink.hbase.CustomTableInputFormat
import com.dounine.scala.flink.utils.HadoopKrb
import com.dounine.scala.flink.utils.HbaseUtil._
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.operators.{DataSource, MapOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.hadoopcompatibility.HadoopInputs
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job
object App {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val conf = HadoopKrb.login()
conf.set(TableInputFormat.INPUT_TABLE, "logTable")
conf.set(TableInputFormat.SCAN_ROW_START, "181111000000")
conf.set(TableInputFormat.SCAN_ROW_STOP, "181111010000")
val inputFormat = HadoopInputs.createHadoopInput(
new CustomTableInputFormat,
classOf[ImmutableBytesWritable],
classOf[Result],
Job.getInstance(conf)
)
val logDataStream = env.createInput(inputFormat)
.map(new MapFunction[Tuple2[ImmutableBytesWritable, Result], Log]() {
@throws[Exception]
override def map(value: Tuple2[ImmutableBytesWritable, Result]): Log = {
val v = (qualifier: String) => getValue(value.f1, "ext", qualifier)
new Log(
v("time"),
v("appKey"),
v("channelCode"),
v("scene"),
v("type"),
v("userId")
)
}
})
val table = tableEnv.fromDataSet(logDataStream, "appKey,ccode,scene,type,userId,time as tt")
tableEnv.registerTable("log", table)
val tt = tableEnv.sqlQuery("select MIN(tt) from log")
// tableEnv.toDataSet(tt, classOf[Row]).print()
tableEnv.toDataSet(tt,classOf[Row]).writeAsText(s"""hdfs://storm5/tmp/flink/${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd'T'HH_mm_ss"))}""")
//
env.execute
}
}
本地是可以运行的,线上无法提交
后面在官方找到答案 传送门
在Scala中,Flink使用在编译时运行的宏,并在仍然可用时捕获所有泛型类型信息。
解决方案
替换如下两句
import org.apache.flink.streaming.api.scala._
val tupleInfo = createTypeInformation[Tuple2[ImmutableBytesWritable, Result]]
val logDataStream = env.createInput(inputFormat,tupleInfo)
Flink 1.8-SNAPSHOT
顺便提一下,克隆最新的版本1.8-SNAPSHOT
是不能直接运行添加的,需要添加相应的依赖包
git clone https://github.com/apache/flink
mvn clean install -DskipTests
下面是可运行的lib
目录依赖的包
flink-dist_2.11-1.8-SNAPSHOT.jar jersey-common-2.25.1.jar jersey-json-1.9.jar
flink-hadoop-compatibility_2.11-1.8-SNAPSHOT.jar jersey-common-2.27.jar jersey-media-jaxb-2.25.1.jar
flink-python_2.11-1.8-SNAPSHOT.jar jersey-container-servlet-core-2.25.1.jar jersey-server-1.9.jar
flink-shaded-hadoop2-uber-1.8-SNAPSHOT.jar jersey-core-1.19.4.jar jersey-server-2.25.1.jar
javax.ws.rs-api-2.0.1.jar jersey-core-1.9.jar log4j-1.2.17.jar
jersey-client-1.9.jar jersey-guava-2.25.1.jar slf4j-log4j12-1.7.15.jar
jersey-client-2.25.1.jar jersey-guice-1.9.jar
网友评论