本文主要采用flink + redis完成数据的清洗过滤,再到计算加工,最后落地.其中包括实现自定义source/sink以及event-time窗口/watermark的使用.
准备
一.docker安装kafka
1.下载docker镜像(如果直接下载docker镜像慢 可通过指定国内镜像仓库进行操作)
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka
2.分别运行docker镜像: zookeeper和kafka
2.1启动zookeeper docker run -d --name zookeeper --publish 2181:2181
--volume /etc/localtime:/etc/localtime
wurstmeister/zookeeper
2.2启动kafka docker run -d --name kafka --publish 9092:9092
--link zookeeper
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env KAFKA_ADVERTISED_HOST_NAME=localhost
--env KAFKA_ADVERTISED_PORT=9092
--volume /etc/localtime:/etc/localtime
wurstmeister/kafka
3.验证docker对应的容器是否启动成功
3.1 运行 docker ps,找到kafka的 CONTAINER ID, 3.2 运行 docker exec -it ${CONTAINER ID} /bin/bash,进入kafka容器。 3.3 进入kafka默认目录 /opt/kafka_2.11-0.10.1.0, 运行 bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test, 创建一个 topic 名称为 test。
运行 bin/kafka-topics.sh --list --zookeeper zookeeper:2181 查看当前的 topic 列表。
运行一个消息生产者,指定 topic 为刚刚创建的 test , bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test,输入一些测试消息。
运行一个消息消费者,同样指定 topic 为 test, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning,可以接收到生产者发送的消息。
二.docker安装redis
1.下载redis镜像
docker pull registry.docker-cn.com/library/redis
2.启动redis镜像
docker run -d -p 6379:6379 --name myredis registry.docker-cn.com/library/redis
3.查看docker ps 查看运行中的容器
4.连接、查看容器,使用redis镜像执行redis-cli命令连接到刚启动的容器
sudo docker exec -it 6fb1ba029b41 redis-cli 出现类似: 127.0.0.1:6379>
三.测试数据集
3.1 数据集地址如下:
wget http://training.ververica.com/trainingData/nycTaxiRides.gz
wget http://training.ververica.com/trainingData/nycTaxiFares.gz
3.2 数据集字段说明
=============================Taxi Ride数据集相关字段说明=============================
rideId : Long // a unique id for each ride 一次行程
taxiId : Long // a unique id for each taxi 本次行程使用的出租车
driverId : Long // a unique id for each driver 本次行程的司机
isStart : Boolean // TRUE for ride start events, FALSE for ride end events 行程开始标识
startTime : DateTime // the start time of a ride 行程开始日期
endTime : DateTime // the end time of a ride, 行程结束日期
// "1970-01-01 00:00:00" for start events
startLon : Float // the longitude of the ride start location 行程开始经度
startLat : Float // the latitude of the ride start location 行程开始维度
endLon : Float // the longitude of the ride end location 行程结束经度
endLat : Float // the latitude of the ride end location 行程结束维度
passengerCnt : Short // number of passengers on the ride 本次行程乘客数
=============================TaxiFare数据集相关字段说明=============================
rideId : Long // a unique id for each ride 一次行程
taxiId : Long // a unique id for each taxi 本次行程的出租车
driverId : Long // a unique id for each driver 本次行程的司机
startTime : DateTime // the start time of a ride 行程开始时间
paymentType : String // CSH or CRD 行程付费方式(CSH/CRD)
tip : Float // tip for this ride 本次行程的里程
tolls : Float // tolls for this ride 本次行程缴费
totalFare : Float // total fare collected 本次行程总费用
代码实现
一.基础类
用于对数据源内容的映射
import com.jdd.streaming.demos.utils.GeoUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Locale;
/**
* @Auther: dalan
* @Date: 19-3-18 15:23
* @Description: 每一段行程
*/
public class TaxiRide implements Comparable<TaxiRide>, Serializable {
private static transient DateTimeFormatter timeFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
public TaxiRide() {
this.startTime = new DateTime();
this.endTime = new DateTime();
}
public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
float startLon, float startLat, float endLon, float endLat,
short passengerCnt, long taxiId, long driverId) {
this.rideId = rideId;
this.isStart = isStart;
this.startTime = startTime;
this.endTime = endTime;
this.startLon = startLon;
this.startLat = startLat;
this.endLon = endLon;
this.endLat = endLat;
this.passengerCnt = passengerCnt;
this.taxiId = taxiId;
this.driverId = driverId;
}
public long rideId;
public boolean isStart;
public DateTime startTime;
public DateTime endTime;
public float startLon;
public float startLat;
public float endLon;
public float endLat;
public short passengerCnt;
public long taxiId;
public long driverId;
public String toString() { // 便于用内容输出
StringBuilder sb = new StringBuilder();
sb.append(rideId).append(",");
sb.append(isStart ? "START" : "END").append(",");
sb.append(startTime.toString(timeFormatter)).append(",");
sb.append(endTime.toString(timeFormatter)).append(",");
sb.append(startLon).append(",");
sb.append(startLat).append(",");
sb.append(endLon).append(",");
sb.append(endLat).append(",");
sb.append(passengerCnt).append(",");
sb.append(taxiId).append(",");
sb.append(driverId);
return sb.toString();
}
// 用于将文件中的每条记录转为对应的TaxiRide实例
public static TaxiRide fromString(String line) {
String[] tokens = line.split(",");
if (tokens.length != 11) {
throw new RuntimeException("Invalid record: " + line);
}
TaxiRide ride = new TaxiRide();
// 完成对应记录内容与字段的对应
try {
ride.rideId = Long.parseLong(tokens[0]);
switch (tokens[1]) {
case "START":
ride.isStart = true;
ride.startTime = DateTime.parse(tokens[2], timeFormatter);
ride.endTime = DateTime.parse(tokens[3], timeFormatter);
break;
case "END":
ride.isStart = false;
ride.endTime = DateTime.parse(tokens[2], timeFormatter);
ride.startTime = DateTime.parse(tokens[3], timeFormatter);
break;
default:
throw new RuntimeException("Invalid record: " + line);
}
ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
ride.passengerCnt = Short.parseShort(tokens[8]);
ride.taxiId = Long.parseLong(tokens[9]);
ride.driverId = Long.parseLong(tokens[10]);
} catch (NumberFormatException nfe) {
throw new RuntimeException("Invalid record: " + line, nfe);
}
return ride;
}
// 基于timestamp排序;注意当开始行程和结束行程的timestamp相同,将start行程放置在前
public int compareTo(TaxiRide other) {
if (other == null) {
return 1;
}
int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
if (compareTimes == 0) {
if (this.isStart == other.isStart) {
return 0;
}
else {
if (this.isStart) {
return -1;
}
else {
return 1;
}
}
}
else {
return compareTimes;
}
}
@Override
public boolean equals(Object other) {
return other instanceof TaxiRide &&
this.rideId == ((TaxiRide) other).rideId;
}
@Override
public int hashCode() {
return (int)this.rideId;
}
// 采用的event time
public long getEventTime() {
if (isStart) {
return startTime.getMillis();
}
else {
return endTime.getMillis();
}
}
// 行程里程
public double getEuclideanDistance(double longitude, double latitude) {
if (this.isStart) {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
} else {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
}
}
// ===================本段代码可暂时忽视=========================
public static class EnrichedRide extends TaxiRide{
public int startCell;
public int endCell;
public EnrichedRide(){}
public EnrichedRide(TaxiRide ride){
this.rideId = ride.rideId;
this.isStart = ride.isStart;
this.startTime = ride.startTime;
this.endTime = ride.endTime;
this.startLon = ride.startLon;
this.startLat = ride.startLat;
this.endLon = ride.endLon;
this.endLat = ride.endLat;
this.passengerCnt = ride.passengerCnt;
this.taxiId = ride.taxiId;
this.driverId = ride.driverId;
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}
@Override
public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}
}
二.自定义source
用来完成读取gz文件.在flink中自定义source需要实现接口SourceFunction:
1.public void run(SourceContext<TaxiRide> sourceContext) throws Exception
一般获取原始源对应的数据内容
2.public void cancel()
不过在实际生产中一般用org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction,它继承了上面SourceFunction和RichFunction的接口,并实现了部分功能.
public class TaxiRideSource implements SourceFunction<TaxiRide> {
private final int maxDelayMsecs;
private final int watermarkDelayMSecs;
private final String dataFilePath;
private final int servingSpeed;
private transient BufferedReader reader;
private transient InputStream gzipStream;
public TaxiRideSource(String dataFilePath) {
this(dataFilePath, 0, 1);
}
public TaxiRideSource(String dataFilePath, int servingSpeedFactor) {
this(dataFilePath, 0, servingSpeedFactor);
}
public TaxiRideSource(String dataFilePath, int maxEventDelaySecs, int servingSpeedFactor) {
if(maxEventDelaySecs < 0) {
throw new IllegalArgumentException("Max event delay must be positive");
}
this.dataFilePath = dataFilePath;
this.maxDelayMsecs = maxEventDelaySecs * 1000;
this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
this.servingSpeed = servingSpeedFactor;
}
// 这是实现Source的关键步骤: 完成对应的source真正的操作
@Override
public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
// 获取gzipStream并结合BufferReader提供带有buffer的reader
gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
// 读取gz文件
generateUnorderedStream(sourceContext);
// 文件获取结束的 资源释放
this.reader.close();
this.reader = null;
this.gzipStream.close();
this.gzipStream = null;
}
// 本段代码主要用于将gz中的记录增加
private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {
long servingStartTime = Calendar.getInstance().getTimeInMillis();
long dataStartTime;
Random rand = new Random(7452);
PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
32,
new Comparator<Tuple2<Long, Object>>() {
@Override
public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
return o1.f0.compareTo(o2.f0);
}
});
// read first ride and insert it into emit schedule
String line;
TaxiRide ride;
if (reader.ready() && (line = reader.readLine()) != null) {
// read first ride
ride = TaxiRide.fromString(line);
// extract starting timestamp
dataStartTime = getEventTime(ride);
// get delayed time
long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
// schedule next watermark
long watermarkTime = dataStartTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
} else {
return;
}
// peek at next ride
if (reader.ready() && (line = reader.readLine()) != null) {
ride = TaxiRide.fromString(line);
}
// read rides one-by-one and emit a random ride from the buffer each time
while (emitSchedule.size() > 0 || reader.ready()) {
// insert all events into schedule that might be emitted next
long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;
long rideEventTime = ride != null ? getEventTime(ride) : -1;
while(
ride != null && ( // while there is a ride AND
emitSchedule.isEmpty() || // and no ride in schedule OR
rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule
)
{
// insert event into emit schedule
long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
// read next ride
if (reader.ready() && (line = reader.readLine()) != null) {
ride = TaxiRide.fromString(line);
rideEventTime = getEventTime(ride);
}
else {
ride = null;
rideEventTime = -1;
}
}
// emit schedule is updated, emit next element in schedule
Tuple2<Long, Object> head = emitSchedule.poll();
long delayedEventTime = head.f0;
long now = Calendar.getInstance().getTimeInMillis();
long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
long waitTime = servingTime - now;
Thread.sleep( (waitTime > 0) ? waitTime : 0);
if(head.f1 instanceof TaxiRide) {
TaxiRide emitRide = (TaxiRide)head.f1;
// emit ride
sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));
}
else if(head.f1 instanceof Watermark) {
Watermark emitWatermark = (Watermark)head.f1;
// emit watermark
sourceContext.emitWatermark(emitWatermark);
// schedule next watermark
long watermarkTime = delayedEventTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
}
}
}
// 处理数据记录时间
public long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
long dataDiff = eventTime - dataStartTime;
return servingStartTime + (dataDiff / this.servingSpeed);
}
public long getEventTime(TaxiRide ride) {
return ride.getEventTime();
}
public long getNormalDelayMsecs(Random rand) {
long delay = -1;
long x = maxDelayMsecs / 2;
while(delay < 0 || delay > maxDelayMsecs) {
delay = (long)(rand.nextGaussian() * x) + x;
}
return delay;
}
// 操作source时 出现异常触发的操作
@Override
public void cancel() {
try {
if (this.reader != null) {
this.reader.close();
}
if (this.gzipStream != null) {
this.gzipStream.close();
}
} catch(IOException ioe) {
throw new RuntimeException("Could not cancel SourceFunction", ioe);
} finally {
this.reader = null;
this.gzipStream = null;
}
}
}
三.自定义sink
主要实现redis作为sink,一般在实际的应用通过继承抽象类:RichSinkFunction
1.public void open(Configuration parameters) throws Exception
2.public void invoke(Tuple2<Long, Long> val, Context context) throws Exception
3.public void close() throws Exception
private transient JedisPool jedisPool;
@Override
public void open(Configuration parameters) throws Exception { // 新建redis pool 初始化
try {
super.open(parameters);
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(redisConfig.getMaxIdle());
config.setMinIdle(redisConfig.getMinIdle());
config.setMaxTotal(redisConfig.getMaxTotal());
jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
redisConfig.getConnectionTimeout(), redisConfig.getPassword(), redisConfig.getDatabase());
} catch (Exception e) {
LOGGER.error("redis sink error {}", e);
}
}
@Override
public void close() throws Exception { // 关闭redis链接 使用完的资源释放
try {
jedisPool.close();
} catch (Exception e) {
LOGGER.error("redis sink error {}", e);
}
}
@Override
public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set("taxi:ride:" + val.f0,val.f1.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != jedis){
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
四.实例
public class TaxiRideCount {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(TaxiRideCount.class);
// main
public static void main(String[] args) throws Exception {
// 读取配置参数: 文件路径/最大延迟时间/
final ParameterTool params = ParameterTool.fromArgs(args);
String path = params.get("file-path","/home/wmm/go_bench/flink_sources/nycTaxiRides.gz");
int maxDeply = params.getInt("max-delay",60);
int servingSpeed = params.getInt("serving-speed",600);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().disableSysoutLogging();
// 指定TaxiRide 读取source内容
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(path, maxDeply, servingSpeed));
//
DataStream<Tuple2<Long,Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
return new Tuple2<Long, Long>(ride.driverId, 1L); // 基于行程中的司机id划分数据 并进行统计
}
});
KeyedStream<Tuple2<Long, Long>, Tuple> keyByDriverId = tuples.keyBy(0); // 基于司机id进行数据划分
DataStream<Tuple2<Long, Long>> rideCounts = keyByDriverId.sum(1); // 累计每个司机的里程数
RedisConfig redisConfig = new RedisConfig();
redisConfig.setHost(params.get("output-redis","127.0.0.1"));
redisConfig.setPort(6379);
redisConfig.setPassword(null);
//RedisSink redisSink = new RedisSink(redisConfig);
// 本段是将redis进行了封装对应的实体类 便于复用
// rideCounts.map(new MapFunction<Tuple2<Long, Long>, RedisCommand>() { // 落地redis
// @Override
// public RedisCommand map(Tuple2<Long, Long> in) throws Exception {
// return new RedisPushCommand("taxi:ride:" + in.f0, Long.toString(in.f1));
// //return new RedisPushCommand("taxi:ride:" + in.f0, new String[]{Long.toString(in.f1)});
// }
// }).addSink(redisSink);
// 直接使用匿名类实现redis sink
rideCounts.addSink(new RichSinkFunction<Tuple2<Long, Long>>() { // 定义sink
private transient JedisPool jedisPool;
@Override
public void open(Configuration parameters) throws Exception { // 新建redis pool
try {
super.open(parameters);
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(redisConfig.getMaxIdle());
config.setMinIdle(redisConfig.getMinIdle());
config.setMaxTotal(redisConfig.getMaxTotal());
jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
redisConfig.getConnectionTimeout(), redisConfig.getPassword(), redisConfig.getDatabase());
} catch (Exception e) {
LOGGER.error("redis sink error {}", e);
}
}
@Override
public void close() throws Exception { // 关闭redis链接
try {
jedisPool.close();
} catch (Exception e) {
LOGGER.error("redis sink error {}", e);
}
}
@Override
public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set("taxi:ride:" + val.f0,val.f1.toString()); // 直接存到redis中
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != jedis){
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
});
//rideCounts.print();
JobExecutionResult result = env.execute("Ride Count By DriverID");
}
}
验证
通过使用redis-client查看对应的内容:
get taxi:ride:2013001713
command count
github提供完整实例
网友评论