由于项目的原因,需要将Flink运行在GCP上,因此File System自然是想使用GCS. 在网上搜了很多,由于众所周知的原因,国内使用Google Cloud非常少,资料就更少了。Flink官方文档对这块描述又很简单,传送门。总结下如何使用GCS作为State Backend方法如下:
- 使用Flink对HDFS的支持方式支持GCS
- 创建core-site.xml
因为本文使用环境是Flink standalone环境,并无hfds,因此首先需要创建core-site.xml.
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>google.cloud.auth.service.account.enable</name>
<value>true</value>
</property>
<!-- Turn security off for tests by default -->
<property>
<name>fs.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
</property>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value>/data/flink-1.9.1/conf/gcs-service-account.json</value>
</property>
<property>
<name>fs.gs.project.id</name>
<value>XXX</value>
<description>
Required. Google Cloud Project ID with access to configured GCS buckets.
</description>
</property>
- 配置flink-conf.yaml 使Flink能够找到core-site.xml配置文件
fs.hdfs.hadoopconf: /data/flink-1.9.1/conf/
- 在程序中使用GCS作为state backend
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setFailOnCheckpointingErrors(false);
checkpointConfig.setCheckpointInterval(10000);
checkpointConfig.setMinPauseBetweenCheckpoints(5000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
"gs://flinkcheckpoint", true);
env.setStateBackend((StateBackend) rocksDBStateBackend);
- 将相关jar包放到Flink能加载的class path上
- gcs-connector-hadoop2-2.0.0.jar
- gcsio-2.0.0.jar
- google-api-client-1.30.1.jar
- google-api-client-jackson2-1.30.1.jar
- google-api-client-java6-1.30.1.jar
- google-api-services-storage-v1-rev20190624-1.30.1.jar
- google-extensions-0.4.jar
- google-http-client-1.30.1.jar
- google-http-client-jackson2-1.30.1.jar
- google-oauth-client-1.30.1.jar
- google-oauth-client-java6-1.30.1.jar
- flink-shaded-hadoop2-2.8.3-1.8.3.jar
如果程序报如下错误,这个时候可以check下Flink的log,一般是因为少加载包了,注意看下 flink-shaded-hadoop2-2.8.3-1.8.3.jar 这个包是必须的,是否已经放到了Flink可以加载的地方。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
- 创建并下载GCP上的相关project的json格式的service account,并将其放置在步骤1 google.cloud.auth.service.account.json.keyfile里的地址下,fs.gs.project.id配置为其project id.
- 一切就绪,Run Flink job, 在GCS的目录下可以检查是否有check point文件生成。
网友评论