美文网首页
Flink-1.12(三)kafka 简单应用开发

Flink-1.12(三)kafka 简单应用开发

作者: _大叔_ | 来源:发表于2021-04-21 11:24 被阅读0次

创建应用(项目)

你可以手动创建,也可以用maven命令创建,结构如下

|--project
  |--src
    |--main
      |--java
        |--com.example.demo
          |--Demo.java
      |--resources
        |--log4j.properties

在工作目录下使用如下命令

mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-walkthrough-datastream-java 
    -DarchetypeVersion=1.12.0 
    -DgroupId=frauddetection 
    -DartifactId=frauddetection 
    -Dversion=0.1 
    -Dpackage=spendreport 
    -DinteractiveMode=false

引入依赖

引入的依赖版本最好和flink安装版本一致

    <dependencies>
        <!-- 必须要有 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <!-- java flink 必须要有 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>

        <!-- 如果将程序作为 Maven 项目开发,则必须添加 flink-clients 模块的依赖 必须要有 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>

        <!-- 解决 Failed to load class "org.slf4j.impl.StaticLoggerBinder". -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <!-- 指定jdk版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- 一般项目打包是不会含有依赖的,使用这个可以帮你把依赖带上,不带的话提交到job是无法运行的 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>

                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <!-- 设置主入口 -->
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.demo.TestKafka</mainClass>
                                </transformer>
                            </transformers>
                            <!-- 自动将所有不使用的类全部排除掉,将 jar 最小化,导致不会引入所有依赖 -->
<!--                            <minimizeJar>true</minimizeJar>-->
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

插件 maven-shade-plugin http://www.iigrowing.cn/maven-shade-plugin_ru_men_zhi_nan.html

代码

package com.example.demo;

import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * @author big uncle
 * @date 2021/3/8 18:50
 * @module
 **/
public class TestKafka {

    @SneakyThrows
    public static void main(String[] args) {

        // StreamExecutionEnvironment用于设置你的执行环境。任务执行环境用于定义任务的属性,创建数据源以及最终启动任务的执行。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 配置kafka信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.81.62:9092");
        properties.setProperty("group.id", "test");

        // 得到 kafka 实例
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic2", new SimpleStringSchema(),properties);
        // 尽可能从最早的记录开始
//        myConsumer.setStartFromEarliest();
        // 从最新的记录开始
        myConsumer.setStartFromLatest();
        // 从指定的时间开始(毫秒)
        // myConsumer.setStartFromTimestamp();
        // myConsumer.setStartFromGroupOffsets(); // 默认的方法

        // 添加数据源
        DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);

        // 简单得打印以下信息
//        DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
        stream.print();
        //
        env.execute("print-kafka-info");

    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            collector.collect(new Tuple2<String, Integer>(s, 1));
        }
    }
}

以上是一个简单而完整flink整合kafka消费数据的例子,打包后提交到flink运行就行,会以无边界的形式持续运行。记得打包成功后,查看一下所打包的jar是否包含了所引用的依赖。

以上是我flink-kafka应用接收到的数据信息。

相关文章

  • Flink-1.12(三)kafka 简单应用开发

    创建应用(项目) 你可以手动创建,也可以用maven命令创建,结构如下 在工作目录下使用如下命令 引入依赖 引入的...

  • kafka编程应用Stream

    Kafka Streams简介 Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka...

  • Kafka 的这些原理你知道吗

    如果只是为了开发 Kafka 应用程序,或者只是在生产环境使用 Kafka,那么了解 Kafka 的内部工作原理不...

  • Spring Boot 集成 Kafka Stream

    Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 ...

  • Spring boot 和 kafka的优雅集成

    在日常的开发工作中,kafka作为消息处理中间件,会经常在SpringBoot的应用中连接kafka,消费kakf...

  • 聊聊spring对kafka的集成方式

    序 本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。 可用类库 kafka client...

  • kafka全面认知

    什么是Kafka[#---kafka] Kafka的应用场景[#kafka-----] Kafka的架构[#kaf...

  • java Kafka 简单应用实例

    1、安装zookeeper 下载zookeeper-3.4.9.tar; 解压tar -zxvf zookeepe...

  • Kafka部署, 简单应用(一)

    Apache项目, 分布式消息应用, 具有很高的扩展性, 高的吞吐量, 大数据中扮演着很重要的角色 下载地址: h...

  • kakfa 测试接入

    介绍kafka kafka 是一个常用的消息队列组件,广泛的应用在分布式场景中,且具有非常优秀的性能。这篇简单介绍...

网友评论

      本文标题:Flink-1.12(三)kafka 简单应用开发

      本文链接:https://www.haomeiwen.com/subject/bvebqltx.html