美文网首页
Flink1.18 streaming快速demo

Flink1.18 streaming快速demo

作者: 清蒸三文鱼_ | 来源:发表于2024-06-06 16:19 被阅读0次

运行脚本(Flink On yarn)

export HADOOP_CONF_DIR=/data/aiops/hadoop/etc/hadoop
export HADOOP_USER_NAME=hdfs
FLINK_HOME=/home/aiops/flink-1.13.6
#这个是官方的样例, 但是跑完后就会停止不方便观察运行情况
#WORDCOUNT_JAR=$FLINK_HOME/examples/streaming/WordCount.jar
#MAIN=org.apache.flink.streaming.examples.wordcount.WordCount
WORDCOUNT_JAR=$FLINK_HOME/flink_demo-1.0-SNAPSHOT.jar
MAIN=org.example.Main
$FLINK_HOME/bin/flink run -d \
  -m yarn-cluster \
  -c $MAIN $WORDCOUNT_JAR \
  -t streaming \

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>flink_demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.18.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.18.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.23.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.23.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.example.Main</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

如果想用低版本的flink运行(如1.13.6), 则对应更改依赖的artifactId会多一个后缀标志scala的版本

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.13.6</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.13.6</version>
    <scope>provided</scope>
</dependency>

代码

package org.example;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class Main {
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2).createInput(new GenericInputFormat<Object>() {
            @Override
            public boolean reachedEnd() throws IOException {
                return false;
            }
            @Override
            public Object nextRecord(Object o) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException ignored) {
                }
                return UUID.randomUUID().toString() + "-" + System.currentTimeMillis();
            }
        }).addSink(new SinkFunction<Object>() {
            @Override
            public void invoke(Object value, Context context) throws Exception {
                logger.info("rec---" + value);
            }
        });
        env.execute();
    }
}

注意: pom中的依赖范围是provided, 这个是为了避免和hadoop classpath或flink/lib下的jar冲突, 所以在IDE运行项目时, 需要勾选上provided, 否则会报类找不到

相关文章

网友评论

      本文标题:Flink1.18 streaming快速demo

      本文链接:https://www.haomeiwen.com/subject/ffnpqjtx.html