创建应用(项目)
你可以手动创建,也可以用maven命令创建,结构如下
|--project
|--src
|--main
|--java
|--com.example.demo
|--Demo.java
|--resources
|--log4j.properties
在工作目录下使用如下命令
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-datastream-java
-DarchetypeVersion=1.12.0
-DgroupId=frauddetection
-DartifactId=frauddetection
-Dversion=0.1
-Dpackage=spendreport
-DinteractiveMode=false
引入依赖
引入的依赖版本最好和flink安装版本一致
<dependencies>
<!-- 必须要有 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- java flink 必须要有 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<!-- 如果将程序作为 Maven 项目开发,则必须添加 flink-clients 模块的依赖 必须要有 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<!-- 解决 Failed to load class "org.slf4j.impl.StaticLoggerBinder". -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定jdk版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 一般项目打包是不会含有依赖的,使用这个可以帮你把依赖带上,不带的话提交到job是无法运行的 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- 设置主入口 -->
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.demo.TestKafka</mainClass>
</transformer>
</transformers>
<!-- 自动将所有不使用的类全部排除掉,将 jar 最小化,导致不会引入所有依赖 -->
<!-- <minimizeJar>true</minimizeJar>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
插件 maven-shade-plugin http://www.iigrowing.cn/maven-shade-plugin_ru_men_zhi_nan.html
代码
package com.example.demo;
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* @author big uncle
* @date 2021/3/8 18:50
* @module
**/
public class TestKafka {
@SneakyThrows
public static void main(String[] args) {
// StreamExecutionEnvironment用于设置你的执行环境。任务执行环境用于定义任务的属性,创建数据源以及最终启动任务的执行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置kafka信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.81.62:9092");
properties.setProperty("group.id", "test");
// 得到 kafka 实例
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic2", new SimpleStringSchema(),properties);
// 尽可能从最早的记录开始
// myConsumer.setStartFromEarliest();
// 从最新的记录开始
myConsumer.setStartFromLatest();
// 从指定的时间开始(毫秒)
// myConsumer.setStartFromTimestamp();
// myConsumer.setStartFromGroupOffsets(); // 默认的方法
// 添加数据源
DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);
// 简单得打印以下信息
// DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
stream.print();
//
env.execute("print-kafka-info");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
collector.collect(new Tuple2<String, Integer>(s, 1));
}
}
}
以上是一个简单而完整flink整合kafka消费数据的例子,打包后提交到flink运行就行,会以无边界的形式持续运行。记得打包成功后,查看一下所打包的jar是否包含了所引用的依赖。



以上是我flink-kafka应用接收到的数据信息。
网友评论