美文网首页flinkFlink实践
Apache Flink实例应用

Apache Flink实例应用

作者: 神奇的考拉 | 来源:发表于2019-03-19 19:18 被阅读0次

本文主要采用flink + redis完成数据的清洗过滤,再到计算加工,最后落地.其中包括实现自定义source/sink以及event-time窗口/watermark的使用.

准备

一.docker安装kafka

1.下载docker镜像(如果直接下载docker镜像慢 可通过指定国内镜像仓库进行操作)

docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

2.分别运行docker镜像: zookeeper和kafka

2.1启动zookeeper docker run -d --name zookeeper --publish 2181:2181
--volume /etc/localtime:/etc/localtime
wurstmeister/zookeeper

2.2启动kafka docker run -d --name kafka --publish 9092:9092
--link zookeeper
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env KAFKA_ADVERTISED_HOST_NAME=localhost
--env KAFKA_ADVERTISED_PORT=9092
--volume /etc/localtime:/etc/localtime
wurstmeister/kafka

3.验证docker对应的容器是否启动成功

3.1 运行 docker ps,找到kafka的 CONTAINER ID, 3.2 运行 docker exec -it ${CONTAINER ID} /bin/bash,进入kafka容器。 3.3 进入kafka默认目录 /opt/kafka_2.11-0.10.1.0, 运行 bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test, 创建一个 topic 名称为 test。

运行 bin/kafka-topics.sh --list --zookeeper zookeeper:2181 查看当前的 topic 列表。

运行一个消息生产者,指定 topic 为刚刚创建的 test , bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test,输入一些测试消息。

运行一个消息消费者,同样指定 topic 为 test, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning,可以接收到生产者发送的消息。

二.docker安装redis

1.下载redis镜像

docker pull registry.docker-cn.com/library/redis

2.启动redis镜像

docker run -d -p 6379:6379 --name myredis registry.docker-cn.com/library/redis

3.查看docker ps 查看运行中的容器

4.连接、查看容器,使用redis镜像执行redis-cli命令连接到刚启动的容器

sudo docker exec -it 6fb1ba029b41 redis-cli 出现类似: 127.0.0.1:6379>

三.测试数据集

3.1 数据集地址如下:

wget http://training.ververica.com/trainingData/nycTaxiRides.gz

wget http://training.ververica.com/trainingData/nycTaxiFares.gz

3.2 数据集字段说明

=============================Taxi Ride数据集相关字段说明=============================
rideId         : Long      // a unique id for each ride 一次行程
taxiId         : Long      // a unique id for each taxi 本次行程使用的出租车
driverId       : Long      // a unique id for each driver 本次行程的司机
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events  行程开始标识
startTime      : DateTime  // the start time of a ride   行程开始日期
endTime        : DateTime  // the end time of a ride,    行程结束日期
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location    行程开始经度
startLat       : Float     // the latitude of the ride start location     行程开始维度
endLon         : Float     // the longitude of the ride end location      行程结束经度
endLat         : Float     // the latitude of the ride end location   行程结束维度
passengerCnt   : Short     // number of passengers on the ride        本次行程乘客数

=============================TaxiFare数据集相关字段说明=============================
rideId         : Long      // a unique id for each ride     一次行程
taxiId         : Long      // a unique id for each taxi     本次行程的出租车
driverId       : Long      // a unique id for each driver   本次行程的司机
startTime      : DateTime  // the start time of a ride      行程开始时间
paymentType    : String    // CSH or CRD                    行程付费方式(CSH/CRD)
tip            : Float     // tip for this ride         本次行程的里程
tolls          : Float     // tolls for this ride           本次行程缴费
totalFare      : Float     // total fare collected          本次行程总费用

代码实现

一.基础类

用于对数据源内容的映射

import com.jdd.streaming.demos.utils.GeoUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Locale;

/**
 * @Auther: dalan
 * @Date: 19-3-18 15:23
 * @Description: 每一段行程
 */
public class TaxiRide implements Comparable<TaxiRide>, Serializable {

    private static transient DateTimeFormatter timeFormatter =
            DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();

    public TaxiRide() {
        this.startTime = new DateTime();
        this.endTime = new DateTime();
    }

    public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
                    float startLon, float startLat, float endLon, float endLat,
                    short passengerCnt, long taxiId, long driverId) {

        this.rideId = rideId;
        this.isStart = isStart;
        this.startTime = startTime;
        this.endTime = endTime;
        this.startLon = startLon;
        this.startLat = startLat;
        this.endLon = endLon;
        this.endLat = endLat;
        this.passengerCnt = passengerCnt;
        this.taxiId = taxiId;
        this.driverId = driverId;
    }

    public long rideId;
    public boolean isStart;
    public DateTime startTime;
    public DateTime endTime;
    public float startLon;
    public float startLat;
    public float endLon;
    public float endLat;
    public short passengerCnt;
    public long taxiId;
    public long driverId;

    public String toString() {  // 便于用内容输出
        StringBuilder sb = new StringBuilder();
        sb.append(rideId).append(",");
        sb.append(isStart ? "START" : "END").append(",");
        sb.append(startTime.toString(timeFormatter)).append(",");
        sb.append(endTime.toString(timeFormatter)).append(",");
        sb.append(startLon).append(",");
        sb.append(startLat).append(",");
        sb.append(endLon).append(",");
        sb.append(endLat).append(",");
        sb.append(passengerCnt).append(",");
        sb.append(taxiId).append(",");
        sb.append(driverId);

        return sb.toString();
    }

    // 用于将文件中的每条记录转为对应的TaxiRide实例
    public static TaxiRide fromString(String line) {
        String[] tokens = line.split(",");
        if (tokens.length != 11) {
            throw new RuntimeException("Invalid record: " + line);
        }

        TaxiRide ride = new TaxiRide();
        // 完成对应记录内容与字段的对应
        try {
            ride.rideId = Long.parseLong(tokens[0]);

            switch (tokens[1]) {
                case "START":
                    ride.isStart = true;
                    ride.startTime = DateTime.parse(tokens[2], timeFormatter);
                    ride.endTime = DateTime.parse(tokens[3], timeFormatter);
                    break;
                case "END":
                    ride.isStart = false;
                    ride.endTime = DateTime.parse(tokens[2], timeFormatter);
                    ride.startTime = DateTime.parse(tokens[3], timeFormatter);
                    break;
                default:
                    throw new RuntimeException("Invalid record: " + line);
            }

            ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
            ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
            ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
            ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
            ride.passengerCnt = Short.parseShort(tokens[8]);
            ride.taxiId = Long.parseLong(tokens[9]);
            ride.driverId = Long.parseLong(tokens[10]);

        } catch (NumberFormatException nfe) {
            throw new RuntimeException("Invalid record: " + line, nfe);
        }

        return ride;
    }

   // 基于timestamp排序;注意当开始行程和结束行程的timestamp相同,将start行程放置在前
    public int compareTo(TaxiRide other) {
        if (other == null) {
            return 1;
        }
        int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
        if (compareTimes == 0) {
            if (this.isStart == other.isStart) {
                return 0;
            }
            else {
                if (this.isStart) {
                    return -1;
                }
                else {
                    return 1;
                }
            }
        }
        else {
            return compareTimes;
        }
    }

    @Override
    public boolean equals(Object other) {
        return other instanceof TaxiRide &&
                this.rideId == ((TaxiRide) other).rideId;
    }

    @Override
    public int hashCode() {
        return (int)this.rideId;
    }

   // 采用的event time
    public long getEventTime() {
        if (isStart) {
            return startTime.getMillis();
        }
        else {
            return endTime.getMillis();
        }
    }
    // 行程里程
    public double getEuclideanDistance(double longitude, double latitude) {
        if (this.isStart) {
            return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
        } else {
            return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
        }
    }


   // ===================本段代码可暂时忽视=========================
    public static class EnrichedRide extends TaxiRide{
        public int startCell;
        public int endCell;

        public EnrichedRide(){}
        public EnrichedRide(TaxiRide ride){
            this.rideId = ride.rideId;
            this.isStart = ride.isStart;
            this.startTime = ride.startTime;
            this.endTime = ride.endTime;
            this.startLon = ride.startLon;
            this.startLat = ride.startLat;
            this.endLon = ride.endLon;
            this.endLat = ride.endLat;
            this.passengerCnt = ride.passengerCnt;
            this.taxiId = ride.taxiId;
            this.driverId = ride.driverId;

            this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
            this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
        }

        @Override
        public String toString() {
            return super.toString() + "," +
                    Integer.toString(this.startCell) + "," +
                    Integer.toString(this.endCell);
        }
    }
}

二.自定义source

用来完成读取gz文件.在flink中自定义source需要实现接口SourceFunction:
1.public void run(SourceContext<TaxiRide> sourceContext) throws Exception
一般获取原始源对应的数据内容
2.public void cancel()
不过在实际生产中一般用org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction,它继承了上面SourceFunction和RichFunction的接口,并实现了部分功能.

public class TaxiRideSource implements SourceFunction<TaxiRide> {

    private final int maxDelayMsecs;
    private final int watermarkDelayMSecs;

    private final String dataFilePath;
    private final int servingSpeed;

    private transient BufferedReader reader;
    private transient InputStream gzipStream;
   
    public TaxiRideSource(String dataFilePath) {
        this(dataFilePath, 0, 1);
    }

    public TaxiRideSource(String dataFilePath, int servingSpeedFactor) {
        this(dataFilePath, 0, servingSpeedFactor);
    }

    public TaxiRideSource(String dataFilePath, int maxEventDelaySecs, int servingSpeedFactor) {
        if(maxEventDelaySecs < 0) {
            throw new IllegalArgumentException("Max event delay must be positive");
        }
        this.dataFilePath = dataFilePath;
        this.maxDelayMsecs = maxEventDelaySecs * 1000;
        this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
        this.servingSpeed = servingSpeedFactor;
    }

   // 这是实现Source的关键步骤: 完成对应的source真正的操作
    @Override
    public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
        // 获取gzipStream并结合BufferReader提供带有buffer的reader
        gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
        // 读取gz文件
        generateUnorderedStream(sourceContext);
        // 文件获取结束的 资源释放
        this.reader.close();
        this.reader = null;
        this.gzipStream.close();
        this.gzipStream = null;
    }
     
    // 本段代码主要用于将gz中的记录增加
    private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        long dataStartTime;

        Random rand = new Random(7452);
        PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
                32,
                new Comparator<Tuple2<Long, Object>>() {
                    @Override
                    public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
                        return o1.f0.compareTo(o2.f0);
                    }
                });

        // read first ride and insert it into emit schedule
        String line;
        TaxiRide ride;
        if (reader.ready() && (line = reader.readLine()) != null) {
            // read first ride
            ride = TaxiRide.fromString(line);
            // extract starting timestamp
            dataStartTime = getEventTime(ride);
            // get delayed time
            long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);

            emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
            // schedule next watermark
            long watermarkTime = dataStartTime + watermarkDelayMSecs;
            Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
            emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));

        } else {
            return;
        }

        // peek at next ride
        if (reader.ready() && (line = reader.readLine()) != null) {
            ride = TaxiRide.fromString(line);
        }

        // read rides one-by-one and emit a random ride from the buffer each time
        while (emitSchedule.size() > 0 || reader.ready()) {

            // insert all events into schedule that might be emitted next
            long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;
            long rideEventTime = ride != null ? getEventTime(ride) : -1;
            while(
                    ride != null && ( // while there is a ride AND
                            emitSchedule.isEmpty() || // and no ride in schedule OR
                                    rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule
            )
            {
                // insert event into emit schedule
                long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
                emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));

                // read next ride
                if (reader.ready() && (line = reader.readLine()) != null) {
                    ride = TaxiRide.fromString(line);
                    rideEventTime = getEventTime(ride);
                }
                else {
                    ride = null;
                    rideEventTime = -1;
                }
            }

            // emit schedule is updated, emit next element in schedule
            Tuple2<Long, Object> head = emitSchedule.poll();
            long delayedEventTime = head.f0;

            long now = Calendar.getInstance().getTimeInMillis();
            long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
            long waitTime = servingTime - now;

            Thread.sleep( (waitTime > 0) ? waitTime : 0);

            if(head.f1 instanceof TaxiRide) {
                TaxiRide emitRide = (TaxiRide)head.f1;
                // emit ride
                sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));
            }
            else if(head.f1 instanceof Watermark) {
                Watermark emitWatermark = (Watermark)head.f1;
                // emit watermark
                sourceContext.emitWatermark(emitWatermark);
                // schedule next watermark
                long watermarkTime = delayedEventTime + watermarkDelayMSecs;
                Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
                emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
            }
        }
    }
    
   // 处理数据记录时间
    public long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
        long dataDiff = eventTime - dataStartTime;
        return servingStartTime + (dataDiff / this.servingSpeed);
    }

    public long getEventTime(TaxiRide ride) {
        return ride.getEventTime();
    }

    public long getNormalDelayMsecs(Random rand) {
        long delay = -1;
        long x = maxDelayMsecs / 2;
        while(delay < 0 || delay > maxDelayMsecs) {
            delay = (long)(rand.nextGaussian() * x) + x;
        }
        return delay;
    }

   // 操作source时 出现异常触发的操作
    @Override
    public void cancel() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.gzipStream != null) {
                this.gzipStream.close();
            }
        } catch(IOException ioe) {
            throw new RuntimeException("Could not cancel SourceFunction", ioe);
        } finally {
            this.reader = null;
            this.gzipStream = null;
        }
    }
}

三.自定义sink

主要实现redis作为sink,一般在实际的应用通过继承抽象类:RichSinkFunction
1.public void open(Configuration parameters) throws Exception
2.public void invoke(Tuple2<Long, Long> val, Context context) throws Exception
3.public void close() throws Exception

 private transient JedisPool jedisPool;
@Override
public void open(Configuration parameters) throws Exception { // 新建redis pool 初始化
 try {
       super.open(parameters);
       JedisPoolConfig config = new JedisPoolConfig();
       config.setMaxIdle(redisConfig.getMaxIdle());
       config.setMinIdle(redisConfig.getMinIdle());
       config.setMaxTotal(redisConfig.getMaxTotal());
       jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
                redisConfig.getConnectionTimeout(), redisConfig.getPassword(),  redisConfig.getDatabase());
   } catch (Exception e) {
        LOGGER.error("redis sink error {}", e);
  }
}

 @Override
public void close() throws Exception { // 关闭redis链接 使用完的资源释放
try {
         jedisPool.close();
    } catch (Exception e) {
     LOGGER.error("redis sink error {}", e);
  }
}

@Override
public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
   Jedis jedis = null;
  try {
       jedis = jedisPool.getResource();
       jedis.set("taxi:ride:" + val.f0,val.f1.toString());
 } catch (Exception e) {
     e.printStackTrace();
} finally {
    if (null != jedis){
        if (jedis != null) {
         try {
               jedis.close();
          } catch (Exception e) {
                e.printStackTrace();
          }
     }
   }
 }
}

四.实例

public class TaxiRideCount {
    /** logger */
    private static final Logger LOGGER = LoggerFactory.getLogger(TaxiRideCount.class);
    // main
    public static void main(String[] args) throws Exception {
        // 读取配置参数: 文件路径/最大延迟时间/
        final ParameterTool params = ParameterTool.fromArgs(args);
        String path = params.get("file-path","/home/wmm/go_bench/flink_sources/nycTaxiRides.gz");
        int maxDeply = params.getInt("max-delay",60);
        int servingSpeed = params.getInt("serving-speed",600);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().disableSysoutLogging();

        // 指定TaxiRide 读取source内容
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(path, maxDeply, servingSpeed));
       // 
        DataStream<Tuple2<Long,Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
                return new Tuple2<Long, Long>(ride.driverId, 1L); // 基于行程中的司机id划分数据 并进行统计
            }
        });

        KeyedStream<Tuple2<Long, Long>, Tuple> keyByDriverId = tuples.keyBy(0); // 基于司机id进行数据划分
        DataStream<Tuple2<Long, Long>> rideCounts = keyByDriverId.sum(1); // 累计每个司机的里程数

        RedisConfig redisConfig = new RedisConfig();
        redisConfig.setHost(params.get("output-redis","127.0.0.1"));
        redisConfig.setPort(6379);
        redisConfig.setPassword(null);
        //RedisSink redisSink = new RedisSink(redisConfig);
// 本段是将redis进行了封装对应的实体类 便于复用
//        rideCounts.map(new MapFunction<Tuple2<Long, Long>, RedisCommand>() { // 落地redis
//            @Override
//            public RedisCommand map(Tuple2<Long, Long> in) throws Exception {
//                return new RedisPushCommand("taxi:ride:" + in.f0, Long.toString(in.f1));
//                //return new RedisPushCommand("taxi:ride:" + in.f0, new String[]{Long.toString(in.f1)});
//            }
//        }).addSink(redisSink);

        // 直接使用匿名类实现redis sink
        rideCounts.addSink(new RichSinkFunction<Tuple2<Long, Long>>() {  // 定义sink
            private transient JedisPool jedisPool;
            @Override
            public void open(Configuration parameters) throws Exception { // 新建redis pool
                try {
                    super.open(parameters);
                    JedisPoolConfig config = new JedisPoolConfig();
                    config.setMaxIdle(redisConfig.getMaxIdle());
                    config.setMinIdle(redisConfig.getMinIdle());
                    config.setMaxTotal(redisConfig.getMaxTotal());
                    jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
                            redisConfig.getConnectionTimeout(), redisConfig.getPassword(), redisConfig.getDatabase());
                } catch (Exception e) {
                    LOGGER.error("redis sink error {}", e);
                }
            }

            @Override
            public void close() throws Exception { // 关闭redis链接
                try {
                    jedisPool.close();
                } catch (Exception e) {
                    LOGGER.error("redis sink error {}", e);
                }
            }

            @Override
            public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
                Jedis jedis = null;
                try {
                    jedis = jedisPool.getResource();
                    jedis.set("taxi:ride:" + val.f0,val.f1.toString()); // 直接存到redis中
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (null != jedis){
                        if (jedis != null) {
                            try {
                                jedis.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        });
        //rideCounts.print();

        JobExecutionResult result = env.execute("Ride Count By DriverID");
     }
}

验证

通过使用redis-client查看对应的内容:
get taxi:ride:2013001713
command count
github提供完整实例

相关文章

网友评论

    本文标题:Apache Flink实例应用

    本文链接:https://www.haomeiwen.com/subject/twbnmqtx.html