美文网首页
Flink编程<三> 坑

Flink编程<三> 坑

作者: 君剑 | 来源:发表于2018-01-05 09:54 被阅读1832次

1 注意import的StreamExecutionEnvironment

// java 的头是

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

// scala的头是:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

如果不小心用了java的环境初始化context, 编译器并不能很好的提示, 会出现一堆看上去在java环境才能有的错误, 比如:

missing parameter type for expanded function

cannot resolve symbol with such signature

类似的很多找不到定义, 不明白类型等等错误, 所以写scala程序的第一步一定要确认StreamExecutionEnvironment是不是选的对

2 maven-shade-plugin, 打包失败, Error creating shaded jar: null: IllegalArgumentException

很显然是包依赖版本问题, shade打包是用append的方法. 我出这个错误主要是因为引入了ES的sink, 参照网上的代码, 需要同时引入依赖flink-connector-elasticsearch5_2.11和org.elasticsearch. 

引入org.elasticsearch主要是为了两个定义

importorg.elasticsearch.action.index.IndexRequest

importorg.elasticsearch.client.Requests

没有太好的解决版本, 试验了几个版本后, 发现这两个版本是匹配的

org.apache.flink flink-connector-elasticsearch5_2.11  flink 1.3.2

org.elasticsearch elasticsearch 5.5.0

sink参考代码如下:

val config =newjava.util.HashMap[String,String]

config.put("cluster.name","my-cluster-name")

// This instructs the sink to emit after every element, otherwise they would be buffered

// config.put("bulk.flush.max.actions", "1")

val transportAddresses =newjava.util.ArrayList[InetSocketAddress]

transportAddresses.add(newInetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))

transportAddresses.add(newInetSocketAddress(InetAddress.getByName("10.2.3.1"),9300))

qosSliceStream.addSink(newElasticsearchSink(config,transportAddresses,newElasticsearchSinkFunction[String] {

def createIndexRequest(element:String): IndexRequest = {

val json =newjava.util.HashMap[String,String]

json.put("data",element)

Requests.indexRequest().index("my-index").`type`("my-type").source(json)

}

importorg.apache.flink.api.common.functions.RuntimeContext

importorg.apache.flink.streaming.connectors.elasticsearch.RequestIndexer

def process(element:String,ctx: RuntimeContext,indexer: RequestIndexer):Unit= {

indexer.add(createIndexRequest(element))

}

}))

3 scala: java list to scala list

问题出现在, 做flatMap, 想把一个二维的java数组展开用于流计算, 但是怎么写都不成, 按理说直接flatMap(x=>x)就可以了, 各种调试发现, log中一直有java.util.list字样, 后来怀疑这个List因为scala不认, 所以不能flatten. 

查了一下, 果然, 需要做的也很简单.

import scala.collection.JavaConverters._

.flatMap(x => x.asScala.toList)

4 TimerException{java.lang.NoSuchMethodError: scala.runtime.LongRef.create(J)Lscala/runtime/LongRef;}

这个是scala编译版本和运行时版本不一致导致的, 可以看到2.10和2.11对这块的定义明显不一致.

为了保证编译时的 scala 版本和运行时的 scala 版本一致, 最好使用项目依赖的 scala-library 运行程序.

最终问题定位为, 提交flink任务的版本为1.3.0, 其使用的scala版本为2.10, 解决方法就是升级到1.3.2, 对应版本为2.11. 就好了

开源项目的版本使用还是挺重要的, 很多项目都会任性的升级, 即使是顶级项目. 所以大公司才有有专人维护开源代码吧.

推荐一个工具, IDEA中有个把包依赖用图形表示的功能, 非常实用, 可以支持索引, 和方便的排除操作.

5 Flink Kafka Connector的特别之处

这一点其实单独写一篇也够了, 不过还是先简单记录下. 先说现象, 准备把几个flink的任务做迁移, 需要迁到一个不同的物理集群上, 担心稳定性和部署方面的事, 就没有停掉之前的任务, 在新集群起了一个相同group ID的任务, 料想kafka会统一分配patitions给到两个任务, 我们最终得到的数据量是不变的, 每条消息只被处理一次. 但是, 事实并不是这个样子的, 几个小时后, 后端的磁盘就报警了, 看了一下发现, 日志量double啦, 也就是说同一个group name的两个flink 任务, 并没有共享任务.

相关文章

网友评论

      本文标题:Flink编程<三> 坑

      本文链接:https://www.haomeiwen.com/subject/lidjnxtx.html