美文网首页
05-flink简单maven工程搭建

05-flink简单maven工程搭建

作者: yayooo | 来源:发表于2021-06-10 10:41 被阅读0次

使用IDEA作为开发工具
一、wordCount
1.1、创建maven项目
1.2、pom文件

<properties>
    <flink.version>1.12.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • flink-streaming-java_2.11 : 2.11为scala版本,因为flink-runtime中用到了akka,akka是scala写的。
  • flink-java:1.12.0

1.3 log4j.properties

log4j.rootLogger=WARN,console
log4j.additivity.org.apache=true
# (console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=Test
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

log4j.appender.debug=org.apache.log4j.DailyRollingFileAppender
log4j.appender.debug.layout=org.apache.log4j.PatternLayout
log4j.appender.debug.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %p [%t] %C.%M(%L) | %m%n
log4j.appender.debug.File=D:\\code\\dfjx\\cctv\\realtime\\RealTime\\test\\debug.txt
log4j.appender.debug.DatePattern='-'yyyy-MM-dd
log4j.appender.debug.Threshold=DEBUG

1.4 批处理wordCount程序

package com.dfjx.flinkTuorial.wordCount;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //todo read data from files
        DataSource<String> dataSource = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
        Operator dataOperator = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
        DataSet<String> dataSet = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
        /**
         * the relations for class DataSource, Operator, DataSet
         * DataSource extends abstract Operator
         * Operator extends abstract    DataSet
         */

        //todo string =>> (word,1)
        AggregateOperator<Tuple2<String, Integer>> dataSourceSum = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }).groupBy(0).sum(1);
        dataSourceSum.print("dataSourceSum: ");

        AggregateOperator dataOperatorSum = dataOperator.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }).groupBy(0).sum(1);
        dataOperatorSum.print("dataOperatorSum: ");

        AggregateOperator<Tuple2<String, Integer>> dataSetSum = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }).groupBy(0).sum(1);
        dataSetSum.print("dataSetSum: ");

        env.execute();
    }
}

批处理基于 DataSet 进行转换的,可以去看看源码,较为简单。

1.5 流式wordCount

package com.dfjx.flinkTuorial.wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");

        /**
         * SingleOutputStreamOperator extend DataStream
         * KeyedStream extend DataStream
         */
        SingleOutputStreamOperator<Tuple2<String, Integer>> dataStreamSourceSum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy(0).sum(1);
        dataStreamSourceSum.print("dataStreamSourceSum: ");
        env.execute();
    }
}

使用java8特性写

package com.dfjx2.wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * author : bigxiao
 * date   : 2021/8/20 0020
 * Desc   :
 * modified by :
 * version :
 */
public class StreamWordCount2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataSource<String> dataSource = batchEnv.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\resources\\hello.txt");
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\resources\\hello.txt");

        // todo 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = dataStreamSource.flatMap((String line, Collector<String> words) -> {
            Arrays.stream(line.split(" ")).forEach(words::collect);
        }).returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        // todo 分组
        KeyedStream<Tuple2<String, Long>, String> keyedStream = wordAndOne.keyBy(t -> t.f0);

        // todo 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyedStream.sum(1);

        //  todo 打印
        sum.print();

        //  todo 提交
        env.execute();
    }
}

流式处理是基于DataStream 来继续计算转换的。

1.6 DataSet 和 DataStream的比较


DataSet DataStream

1.7 并行度/分区概念引入
红色部分的1,2,3,4表示什么?
我的wordCount程序是在本地跑的,本地电脑cpu为4核的,可以同时有4个进程同时进行,即4个并行度的概念。


wordCoun打印结果

1.8 从Socket读取数据输入的wordCount
使用Socket模拟无界流数据。

package com.dfjx.flinkTuorial.wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ResourceBundle;

public class SocketStreamWordCount {
    static String hostname;
    static int port;
    static {
        ResourceBundle bundle = ResourceBundle.getBundle("socket");
        hostname = bundle.getString("hostname");
        port = Integer.valueOf(bundle.getString("port"));
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);
        SingleOutputStreamOperator<Tuple2<String, Integer>> dataStreamSourceSum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy(0).sum(1);

        dataStreamSourceSum.print();
        env.execute();
    }
}

相关文章

网友评论

      本文标题:05-flink简单maven工程搭建

      本文链接:https://www.haomeiwen.com/subject/nkearltx.html