1. 先看一下Flink 的编程模型
graph LR
A[Environment] -->B[DataSource] --> C[Transformattion]-->D[Sink ]
E[上下文环境 ]-->F[多个数据源]-->G[ 操作转化]-->Q[数据输出]
1.1 Environment
Environment 创建一个执行环境,通过 getExecutionEnvironment 返回你需要的环境。
1.有返回本地环境:CreateLocalEnvironment
如:val env = StreamExecutionEnvironment.createLocalEnvironment(1);
2.返回集群执行环境: CreateRemoteEnvironment
如:val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
2.Flink 的DataSource的数据源有:
1.文件的数据源
2.集合的数据源
3.Kafka 的数据源
4.自定义的Source 数据源
2.1 文件的数据源
java 代码
package com.wudl.core;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
/**
* @ClassName : SourceFile
* @Description : 读取文件
* @Author :wudl
* @Date: 2020-10-22 00:34
*/
public class SourceFile {
public static void main(String[] args) {
// 获取上下文的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.readTextFile("D:\\ideaWorkSpace\\learning\\Flinklearning\\wudl-flink-java\\src\\main\\java\\com\\wudl\\core\\SourceFile.java");
dataStream.print();
env.execute();
}
}
Scala 版本:
package com.wudl.flink.core
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* @ClassName : SourceFile
* @Description : 读取文件的数据源
* @Author :wudl
* @Date: 2020-12-02 00:34
*/
object FlinkFlieSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(1)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
val dataStream:DataStream[String] = env.readTextFile("F:\\ideaWorkSpace2020\\jg\\Flinklearning\\flink-core\\src\\main\\scala\\com\\wudl\\flink\\core\\FlinkFlieSource.scala")
dataStream.print()
env.execute("从文件读取数据源")
}
}
2.2 从集合中的数据源
package com.wudl.core;
import com.wudl.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @ClassName : SourceConllection
* @Description : 读取集合
* @Author :wudl
* @Date: 2020-10-22 00:20
*/
public class SourceConllection {
public static void main(String[] args) throws Exception {
// 获取上下文的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<WaterSensor> ts = Arrays.asList(
new WaterSensor("sensor_1", 15321312412L, 41),
new WaterSensor("sensor_2", 15321763412L, 47),
new WaterSensor("sensor_3", 15369732412L, 49)
);
DataStreamSource<WaterSensor> dataStream = env.fromCollection(ts );
dataStream.print();
env.execute();
}
}
2.3 从Kafka获取到的数据源
package com.wudl.realproces
import java.util.Properties
import com.alibaba.fastjson.JSON
import com.wudl.realproces.bean.{ClickLog, Message}
import com.wudl.realproces.utils.GlobalConfigUtil
import org.apache.commons.math.stat.descriptive.rank.Max
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
object App {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置流式时间为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置并行度为1
env.setParallelism(1)
// 测试一下
env.fromCollection(List("hadoop", "hive")).print()
/**
* 添加 Checkpoint
* 保证程序长时间运行的安全性进行checkpoint操作
*/
// 5秒钟启动一次checkpoint
env.enableCheckpointing(5000)
// 设置checkpoint 只 checkpoint 一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置两次checkpoint 的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// checkpoint 的超时时间
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许的最大checkpoint 的并行度
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
// 当程序关闭时, 触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 设置checkpoint 的地址
env.setStateBackend(new FsStateBackend("hdfs://node01.com:8020/fink-checkpoint/"))
/**
* ***************************flink 整合kafka*******************************
*/
val properties = new Properties()
// # Kafka集群地址
properties.setProperty("bootstrap.servers", GlobalConfigUtil.bootstrapServers)
// # ZooKeeper集群地址
properties.setProperty("zookeeper.connect", GlobalConfigUtil.zookeeperConnect)
// # Kafka Topic名称
properties.setProperty("input.topic", GlobalConfigUtil.inputTopic)
// # 消费组ID
properties.setProperty("group.id", GlobalConfigUtil.groupId)
// # 自动提交拉取到消费端的消息offset到kafka
properties.setProperty("enable.auto.commit", GlobalConfigUtil.enableAutoCommit)
// # 自动提交offset到zookeeper的时间间隔单位(毫秒)
properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.autoCommitIntervalMs)
// # 每次消费最新的数据
properties.setProperty("auto.offset.reset", GlobalConfigUtil.autoOffsetReset)
// 反序列化器 属性集合
val consumer = new FlinkKafkaConsumer010[String](GlobalConfigUtil.inputTopic, new SimpleStringSchema(), properties)
val kafkaStream: DataStream[String] = env.addSource(consumer)
// kafkaStream.print()
// 将json 转化为元组
val tunlpDataStream = kafkaStream.map {
msgjson =>
val jsonObject = JSON.parseObject(msgjson)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
Message(ClickLog(message), count, timeStamp)
}
tunlpDataStream.print()
/**
* ------------------------------- Flink 添加水印的支持--------------------
*/
val watemarkDataStream = tunlpDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {
// 当前时间搓
var currentTimeStamp = 0l
// 延迟时间
var maxDelayTime = 2000l
//获取当前的时间戳
override def getCurrentWatermark: Watermark = {
new Watermark(currentTimeStamp - maxDelayTime)
}
//获取事件的时间
override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {
currentTimeStamp = Math.max(element.timeStamp, previousElementTimestamp)
currentTimeStamp
}
})
// 数据的预处理
env.execute()
}
}
2.4 自定义Source的数据源
package com.wudl.flink.core
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Random
/**
* @ClassName : CustomerSource
* @Description : 自定义数据源
* @Author :wudl
* @Date: 2020-12-02 11:23
*/
object CustomerSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource)
stream.print()
env.execute()
}
//写一个实现SourceFunction接口
class MyCustomerSource extends SourceFunction[StationLog] {
//是否终止数据流的标记
var flag = true;
/**
* 主要的方法
* 启动一个Source
* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
*
* @param sourceContext
* @throws Exception
*/
override def run(sourceContext: SourceFunction.SourceContext[StationLog]):
Unit = {
val random = new Random()
var types = Array("fail", "busy", "barring", "success")
38
while (flag) { //如果流没有终止,继续获取数据
1.to(5).map(i => {
var callOut = "1860000%04d".format(random.nextInt(10000))
var callIn = "1890000%04d".format(random.nextInt(10000))
new
StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4
)), System.currentTimeMillis(), 0)
}).foreach(sourceContext.collect(_)) //发数据
Thread.sleep(2000) //每发送一次数据休眠2秒
}
} //终止数据流
override def cancel(): Unit = flag = false
}
}

网友评论