采用将程序Jar包上传的Flink镜像中的方式,使用Flink On K8s Standalone模式
一、编写Dockerflie
image.png
FROM flink:1.12-scala_2.11-java8
COPY ./target/xxx.jar /opt/flink/usrlib/xxx.jar
操作就是先拉取flink官方镜像,然后将本地jar包copy到镜像的usrlib目录下。
二、编写K8s资源描述文件
从Flink on Kubernetes 的架构如上图所示,Flink 任务在 Kubernetes 上运行的步骤有:
(1)首先往 Kubernetes 集群提交了资源描述文件后,会启动 Master 和 Worker 的 container。
(2)Master Container 中会启动 Flink Master Process,包含 Flink-Container ResourceManager、JobManager 和 Program Runner。
(3)Worker Container 会启动 TaskManager,并向负责资源管理的 ResourceManager 进行注册,注册完成之后,由 JobManager 将具体的任务分给 Worker Container ,再由 Container 去执行。
(4)需要说明的是,Master Container 与Worker Container是用一个镜像启动的,只是启动参数不一样,如下图所示,两个deployment文件的镜像是同一个。
具体描述文件详见附件,flink-jobmanager.yaml和flink-taskmanager.yaml,需根据实际情况更新image地址和启动的Class文件。
flink-jobmanager.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: xxx-flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: hub/online/xxx-flink:flink_on_k8s
imagePullPolicy: IfNotPresent
env:
args: ["standalone-job", "--job-classname", "com.xxx.App"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
flink-taskmanager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: xxx-flink-taskmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: hub/online/xxx-flink:flink_on_k8s
imagePullPolicy: IfNotPresent
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
三、定义ConfigMap
通过flink-config-configmap.yaml文件将它们定义为 ConfigMap 来实现配置的传递和读取。如果使用默认配置,这一步则不需要。
flink-config-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: xxx-flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 1
log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
四、启动JobManager
JobManager 的执行过程分为两步:
首先,JobManager 通过 Deployment 进行描述,保证 1 个副本的 Container 运行 JobManager,可以定义一个标签,例如 flink-jobmanager。
kubectl apply -f flink-jobmanager.yaml
其次,还需要定义一个JobManager的Service,通过 service name 和 port 暴露 JobManager 服务,通过标签选择对应的 pods。
kubectl apply -f flink-jobmanager-service.yaml
flink-jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
五、启动TaskManager
TaskManager 也是通过 Deployment 来进行描述,保证 n 个副本的 Container 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager。
kubectl apply -f flink-taskmanager.yaml
六、更新Jar包操作
更新镜像中的Jar包,重新推镜像,更新完成后,修改flink-jobmanager.yaml和flink-taskmanager.yaml文件中的image路径,然后重新执行
kubectl apply -f flink-jobmanager.yaml
kubectl apply -f flink-taskmanager.yaml
网友评论