美文网首页
2-快速入门

2-快速入门

作者: 无暇的风笛 | 来源:发表于2020-07-18 12:30 被阅读0次

1、引入依赖

使用的是java开发语言,以下是主要的pom.xml配置

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

2、开发步骤

  1. 获取执行环境(批处理或流处理)
  2. 通过source加载数据
  3. 执行算子操作
  4. 通过sink输出数据
  5. execute执行

3、流处理demo

通过socket模拟输入流,统计单词个数

package streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import lombok.Data;


/**
 * @author pxl
 */

public class SocketWordCount {

    public static void main(String[] args) throws Exception {
        String hostname = "localhost";
        String delimiter = "\n";
        int port = 9000;

        // 获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 链接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
        SingleOutputStreamOperator<Word> stream = text.flatMap(new FlatMapFunction<String, Word>() {
            @Override
            public void flatMap(String s, Collector<Word> out) {
                String[] line = s.split("\\s");
                for (String word : line) {
                    out.collect(new Word(word, 1));
                }
            }
        });
        SingleOutputStreamOperator<Word> sum = stream.keyBy("word").sum("count");
        sum.print().setParallelism(1);
        env.execute("job is running");
    }


    @Data
    public static class Word {

        String word;

        int count;


        public Word() {
        }


        public Word(String word, int count) {
            this.word = word;
            this.count = count;
        }
    }

}

启动控制面板,输入内容:


2.1-控制台.png

启动程序,输出内容如下:


2.2-实时流输出.png

4、批处理demo

读取txt文本进行分词,统计词频

package batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


/**
 * @author pxl
 */
public class FileWordCount {

    public static void main(String[] args) throws Exception {
        String filePath = "/Users/xiaolong/tmp/poem.txt";
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> file = env.readTextFile(filePath);
        DataSet<Tuple2<String, Integer>> data = file.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = s.split("\\W+");
                for (String word : words) {
                    if (word.trim().length() > 0) {
                        out.collect(new Tuple2<>(word.trim(), 1));
                    }
                }
            }
        }).groupBy(0).sum(1);

        data.print();
    }
}

输出结果:


2.3-批处理输出.png

相关文章

网友评论

      本文标题:2-快速入门

      本文链接:https://www.haomeiwen.com/subject/ivixkktx.html