美文网首页
一步一步的写scala代码跑到flink上面

一步一步的写scala代码跑到flink上面

作者: 万州客 | 来源:发表于2022-05-06 07:50 被阅读0次

由于不了解scala,所以今天差不多搞了一个下午加一个晚上,要记录下来,下次直接学代码了,不怕工程上的东东了。

一,flink的单节点运行

使用docker-compose.yml,注意要指定版本,如果scala版本不对的话,动不动就出错,我就遇到了很多少,不断的调系统级的scala版本,IDEA里的编译scala版本,flink编译出来的scala版本。最后,得出一个快速的方案,都用scala 2.11版本。操作系统,pom.xml依赖,flink编译时。比如,这个docker镜像,我用的就是flink:scala_2.11-java8版本。
镜像 到dockerhub上找tag


2022-05-05 21_09_51-MessageCenterUI.png
version: "2.1"
services:
  jobmanager:
    image: flink:scala_2.11-java8
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - ./root/flink:/demo
  taskmanager:
    image: flink:scala_2.11-java8
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

二, 项目结构和代码编译

启动IDEA,建好项目。IDEA项目建立时,参数如下:


2022-05-05 21_22_14-悬浮球.png
2022-05-05 21_29_05-MessageCenterUI.png

目录结构如下:


2022-05-05 21_18_50-MessageCenterUI.png

POM.XML内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.bbk</groupId>
    <artifactId>flink</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.8.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.8.1</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <!-- flink与kafka集成需要导入的jar包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <!-- 导入flink与hbase整合jar包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop2</artifactId>
            <!-- 暂时没有1.8.1这个版本 -->
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0-cdh5.14.2</version>
        </dependency>
        <!--  flink table  API需要导入的jar包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.8.1</version>
        </dependency>
        <!--  flink与json进行整合需要的jar包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.8.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 限制jdk版本插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- 编译scala需要用到的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 项目打包用到的插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Demo.scala内容如下

package org.bbk.flink

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

case class CountWord(word: String, count: Long)

object Demo {
  def main(args:Array[String]):Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val result: DataStream[String] = environment.socketTextStream("192.168.1.111", 9000)
    import org.apache.flink.api.scala._
    val resultValue: DataStream[CountWord] = result.flatMap(x => x.split(" ")).map(x => CountWord(x, 1)).keyBy("word").sum("count")
    resultValue.print().setParallelism(1)
    environment.execute()
  }
}

代码编译
mvn clean scala:compile compile package
会生产flink-1.0-SNAPSHOT-jar-with-dependencies.jar这样一个文件。
将这个文件,想办法CP进镜像时备用。如何上传和CP进容器,这就不说细说了吧。

三, 在类生产的flink中运行jar包

在一个terminal中运行一个nc程序,模拟实时数据输入。

[root@127 ~]# nc -lk 9000
ssss
df
sd
hjjh
ytyu
dddd
this
2222
this
this
2222
df
df
df
df
df

运行刚才生成的jar包,提交到flink节点中

root@7fc97df9c324:/demo# flink run --class org.bbk.flink.Demo flink-1.0-SNAPSHOT-jar-with-dependencies.jar
Job has been submitted with JobID 06022ffb1c4ef7b191b34186340c9754

看看flink的输出吧

taskmanager_1  | 2022-05-05 13:00:12,233 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
jobmanager_1   | 2022-05-05 13:00:12,241 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from DEPLOYING to INITIALIZING.
jobmanager_1   | 2022-05-05 13:00:12,243 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
taskmanager_1  | 2022-05-05 13:00:13,541 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1  | 2022-05-05 13:00:13,548 INFO  org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction [] - Connecting to server socket 192.168.1.111:9000
jobmanager_1   | 2022-05-05 13:00:13,548 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1  | 2022-05-05 13:00:13,561 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
taskmanager_1  | 2022-05-05 13:00:13,569 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
taskmanager_1  | 2022-05-05 13:00:13,583 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1)#0 (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
jobmanager_1   | 2022-05-05 13:00:13,587 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
taskmanager_1  | CountWord(ytyu,1)
taskmanager_1  | CountWord(dddd,1)
taskmanager_1  | CountWord(this,1)
taskmanager_1  | CountWord(2222,1)
taskmanager_1  | CountWord(this,2)
taskmanager_1  | CountWord(this,3)
taskmanager_1  | CountWord(2222,2)
taskmanager_1  | CountWord(df,1)
taskmanager_1  | CountWord(df,2)
taskmanager_1  | CountWord(df,3)
taskmanager_1  | CountWord(df,4)
taskmanager_1  | CountWord(df,5)

看看flink web ui上的东东


2022-05-05 21_13_48-MessageCenterUI.png

四,未完待续

相关文章

  • 一步一步的写scala代码跑到flink上面

    由于不了解scala,所以今天差不多搞了一个下午加一个晚上,要记录下来,下次直接学代码了,不怕工程上的东东了。 一...

  • 初识scala

    为什么要用scala 大数据很多框架是用scala写的,如:saprk、kafka、flink 代码简洁 多范式:...

  • Flink的API

    Source API 以下scala代码展示了几种source类型: flink从kafka获取源数据 首先pow...

  • Flink计算框架

    Flink是什么 Flink使用java语言开发的计算框架,提供了scala编程的接口。使用java或者scala...

  • Flink中scala-shell参考

    bin/start-scala-shell.sh Flink Scala Shell Usage: start-s...

  • Scala使用Netty

    java能写的换做scala也可以写,而且scala在语法上面又比java简洁很多,所以尝试用scala写一个简单...

  • Flink基础系列3-windows安装Flink

    一.Flink下载 本次以Flink 1.9.0版本为例。 下载 flink-1.9.0-bin-scala_2....

  • 分布式计算框架Flink

    Flink底层架构 Flink支持的sink 三方中间件 Flink的安装 下载 Hadoop 以及 scala ...

  • Flink QuickStart

    简介 本篇讲解如何创建Flink的初始项目。 Flink支持使用Java或者Scala语言来编写Flink应用。这...

  • Flink 安装部署步骤

    版本 组件版本包名Flink版本1.8.2flink-1.8.2-bin-scala_2.11.tgzHadoop...

网友评论

      本文标题:一步一步的写scala代码跑到flink上面

      本文链接:https://www.haomeiwen.com/subject/ckvoyrtx.html