美文网首页flinkFlink实践
Apache Flink实例应用

Apache Flink实例应用

作者: 神奇的考拉 | 来源:发表于2019-03-19 19:18 被阅读0次

    本文主要采用flink + redis完成数据的清洗过滤,再到计算加工,最后落地.其中包括实现自定义source/sink以及event-time窗口/watermark的使用.

    准备

    一.docker安装kafka

    1.下载docker镜像(如果直接下载docker镜像慢 可通过指定国内镜像仓库进行操作)

    docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

    2.分别运行docker镜像: zookeeper和kafka

    2.1启动zookeeper docker run -d --name zookeeper --publish 2181:2181
    --volume /etc/localtime:/etc/localtime
    wurstmeister/zookeeper

    2.2启动kafka docker run -d --name kafka --publish 9092:9092
    --link zookeeper
    --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    --env KAFKA_ADVERTISED_HOST_NAME=localhost
    --env KAFKA_ADVERTISED_PORT=9092
    --volume /etc/localtime:/etc/localtime
    wurstmeister/kafka

    3.验证docker对应的容器是否启动成功

    3.1 运行 docker ps,找到kafka的 CONTAINER ID, 3.2 运行 docker exec -it ${CONTAINER ID} /bin/bash,进入kafka容器。 3.3 进入kafka默认目录 /opt/kafka_2.11-0.10.1.0, 运行 bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test, 创建一个 topic 名称为 test。

    运行 bin/kafka-topics.sh --list --zookeeper zookeeper:2181 查看当前的 topic 列表。

    运行一个消息生产者,指定 topic 为刚刚创建的 test , bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test,输入一些测试消息。

    运行一个消息消费者,同样指定 topic 为 test, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning,可以接收到生产者发送的消息。

    二.docker安装redis

    1.下载redis镜像

    docker pull registry.docker-cn.com/library/redis

    2.启动redis镜像

    docker run -d -p 6379:6379 --name myredis registry.docker-cn.com/library/redis

    3.查看docker ps 查看运行中的容器

    4.连接、查看容器,使用redis镜像执行redis-cli命令连接到刚启动的容器

    sudo docker exec -it 6fb1ba029b41 redis-cli 出现类似: 127.0.0.1:6379>

    三.测试数据集

    3.1 数据集地址如下:

    wget http://training.ververica.com/trainingData/nycTaxiRides.gz

    wget http://training.ververica.com/trainingData/nycTaxiFares.gz

    3.2 数据集字段说明

    =============================Taxi Ride数据集相关字段说明=============================
    rideId         : Long      // a unique id for each ride 一次行程
    taxiId         : Long      // a unique id for each taxi 本次行程使用的出租车
    driverId       : Long      // a unique id for each driver 本次行程的司机
    isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events  行程开始标识
    startTime      : DateTime  // the start time of a ride   行程开始日期
    endTime        : DateTime  // the end time of a ride,    行程结束日期
                               //   "1970-01-01 00:00:00" for start events
    startLon       : Float     // the longitude of the ride start location    行程开始经度
    startLat       : Float     // the latitude of the ride start location     行程开始维度
    endLon         : Float     // the longitude of the ride end location      行程结束经度
    endLat         : Float     // the latitude of the ride end location   行程结束维度
    passengerCnt   : Short     // number of passengers on the ride        本次行程乘客数
    
    
    =============================TaxiFare数据集相关字段说明=============================
    rideId         : Long      // a unique id for each ride     一次行程
    taxiId         : Long      // a unique id for each taxi     本次行程的出租车
    driverId       : Long      // a unique id for each driver   本次行程的司机
    startTime      : DateTime  // the start time of a ride      行程开始时间
    paymentType    : String    // CSH or CRD                    行程付费方式(CSH/CRD)
    tip            : Float     // tip for this ride         本次行程的里程
    tolls          : Float     // tolls for this ride           本次行程缴费
    totalFare      : Float     // total fare collected          本次行程总费用
    
    

    代码实现

    一.基础类

    用于对数据源内容的映射

    import com.jdd.streaming.demos.utils.GeoUtils;
    import org.joda.time.DateTime;
    import org.joda.time.format.DateTimeFormat;
    import org.joda.time.format.DateTimeFormatter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Serializable;
    import java.util.Locale;
    
    /**
     * @Auther: dalan
     * @Date: 19-3-18 15:23
     * @Description: 每一段行程
     */
    public class TaxiRide implements Comparable<TaxiRide>, Serializable {
    
        private static transient DateTimeFormatter timeFormatter =
                DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
    
        public TaxiRide() {
            this.startTime = new DateTime();
            this.endTime = new DateTime();
        }
    
        public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
                        float startLon, float startLat, float endLon, float endLat,
                        short passengerCnt, long taxiId, long driverId) {
    
            this.rideId = rideId;
            this.isStart = isStart;
            this.startTime = startTime;
            this.endTime = endTime;
            this.startLon = startLon;
            this.startLat = startLat;
            this.endLon = endLon;
            this.endLat = endLat;
            this.passengerCnt = passengerCnt;
            this.taxiId = taxiId;
            this.driverId = driverId;
        }
    
        public long rideId;
        public boolean isStart;
        public DateTime startTime;
        public DateTime endTime;
        public float startLon;
        public float startLat;
        public float endLon;
        public float endLat;
        public short passengerCnt;
        public long taxiId;
        public long driverId;
    
        public String toString() {  // 便于用内容输出
            StringBuilder sb = new StringBuilder();
            sb.append(rideId).append(",");
            sb.append(isStart ? "START" : "END").append(",");
            sb.append(startTime.toString(timeFormatter)).append(",");
            sb.append(endTime.toString(timeFormatter)).append(",");
            sb.append(startLon).append(",");
            sb.append(startLat).append(",");
            sb.append(endLon).append(",");
            sb.append(endLat).append(",");
            sb.append(passengerCnt).append(",");
            sb.append(taxiId).append(",");
            sb.append(driverId);
    
            return sb.toString();
        }
    
        // 用于将文件中的每条记录转为对应的TaxiRide实例
        public static TaxiRide fromString(String line) {
            String[] tokens = line.split(",");
            if (tokens.length != 11) {
                throw new RuntimeException("Invalid record: " + line);
            }
    
            TaxiRide ride = new TaxiRide();
            // 完成对应记录内容与字段的对应
            try {
                ride.rideId = Long.parseLong(tokens[0]);
    
                switch (tokens[1]) {
                    case "START":
                        ride.isStart = true;
                        ride.startTime = DateTime.parse(tokens[2], timeFormatter);
                        ride.endTime = DateTime.parse(tokens[3], timeFormatter);
                        break;
                    case "END":
                        ride.isStart = false;
                        ride.endTime = DateTime.parse(tokens[2], timeFormatter);
                        ride.startTime = DateTime.parse(tokens[3], timeFormatter);
                        break;
                    default:
                        throw new RuntimeException("Invalid record: " + line);
                }
    
                ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
                ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
                ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
                ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
                ride.passengerCnt = Short.parseShort(tokens[8]);
                ride.taxiId = Long.parseLong(tokens[9]);
                ride.driverId = Long.parseLong(tokens[10]);
    
            } catch (NumberFormatException nfe) {
                throw new RuntimeException("Invalid record: " + line, nfe);
            }
    
            return ride;
        }
    
       // 基于timestamp排序;注意当开始行程和结束行程的timestamp相同,将start行程放置在前
        public int compareTo(TaxiRide other) {
            if (other == null) {
                return 1;
            }
            int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
            if (compareTimes == 0) {
                if (this.isStart == other.isStart) {
                    return 0;
                }
                else {
                    if (this.isStart) {
                        return -1;
                    }
                    else {
                        return 1;
                    }
                }
            }
            else {
                return compareTimes;
            }
        }
    
        @Override
        public boolean equals(Object other) {
            return other instanceof TaxiRide &&
                    this.rideId == ((TaxiRide) other).rideId;
        }
    
        @Override
        public int hashCode() {
            return (int)this.rideId;
        }
    
       // 采用的event time
        public long getEventTime() {
            if (isStart) {
                return startTime.getMillis();
            }
            else {
                return endTime.getMillis();
            }
        }
        // 行程里程
        public double getEuclideanDistance(double longitude, double latitude) {
            if (this.isStart) {
                return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
            } else {
                return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
            }
        }
    
    
       // ===================本段代码可暂时忽视=========================
        public static class EnrichedRide extends TaxiRide{
            public int startCell;
            public int endCell;
    
            public EnrichedRide(){}
            public EnrichedRide(TaxiRide ride){
                this.rideId = ride.rideId;
                this.isStart = ride.isStart;
                this.startTime = ride.startTime;
                this.endTime = ride.endTime;
                this.startLon = ride.startLon;
                this.startLat = ride.startLat;
                this.endLon = ride.endLon;
                this.endLat = ride.endLat;
                this.passengerCnt = ride.passengerCnt;
                this.taxiId = ride.taxiId;
                this.driverId = ride.driverId;
    
                this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
                this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
            }
    
            @Override
            public String toString() {
                return super.toString() + "," +
                        Integer.toString(this.startCell) + "," +
                        Integer.toString(this.endCell);
            }
        }
    }
    

    二.自定义source

    用来完成读取gz文件.在flink中自定义source需要实现接口SourceFunction:
    1.public void run(SourceContext<TaxiRide> sourceContext) throws Exception
    一般获取原始源对应的数据内容
    2.public void cancel()
    不过在实际生产中一般用org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction,它继承了上面SourceFunction和RichFunction的接口,并实现了部分功能.

    public class TaxiRideSource implements SourceFunction<TaxiRide> {
    
        private final int maxDelayMsecs;
        private final int watermarkDelayMSecs;
    
        private final String dataFilePath;
        private final int servingSpeed;
    
        private transient BufferedReader reader;
        private transient InputStream gzipStream;
       
        public TaxiRideSource(String dataFilePath) {
            this(dataFilePath, 0, 1);
        }
    
        public TaxiRideSource(String dataFilePath, int servingSpeedFactor) {
            this(dataFilePath, 0, servingSpeedFactor);
        }
    
        public TaxiRideSource(String dataFilePath, int maxEventDelaySecs, int servingSpeedFactor) {
            if(maxEventDelaySecs < 0) {
                throw new IllegalArgumentException("Max event delay must be positive");
            }
            this.dataFilePath = dataFilePath;
            this.maxDelayMsecs = maxEventDelaySecs * 1000;
            this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
            this.servingSpeed = servingSpeedFactor;
        }
    
       // 这是实现Source的关键步骤: 完成对应的source真正的操作
        @Override
        public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
            // 获取gzipStream并结合BufferReader提供带有buffer的reader
            gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
            reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
            // 读取gz文件
            generateUnorderedStream(sourceContext);
            // 文件获取结束的 资源释放
            this.reader.close();
            this.reader = null;
            this.gzipStream.close();
            this.gzipStream = null;
        }
         
        // 本段代码主要用于将gz中的记录增加
        private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {
            long servingStartTime = Calendar.getInstance().getTimeInMillis();
            long dataStartTime;
    
            Random rand = new Random(7452);
            PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
                    32,
                    new Comparator<Tuple2<Long, Object>>() {
                        @Override
                        public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
                            return o1.f0.compareTo(o2.f0);
                        }
                    });
    
            // read first ride and insert it into emit schedule
            String line;
            TaxiRide ride;
            if (reader.ready() && (line = reader.readLine()) != null) {
                // read first ride
                ride = TaxiRide.fromString(line);
                // extract starting timestamp
                dataStartTime = getEventTime(ride);
                // get delayed time
                long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
    
                emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
                // schedule next watermark
                long watermarkTime = dataStartTime + watermarkDelayMSecs;
                Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
                emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
    
            } else {
                return;
            }
    
            // peek at next ride
            if (reader.ready() && (line = reader.readLine()) != null) {
                ride = TaxiRide.fromString(line);
            }
    
            // read rides one-by-one and emit a random ride from the buffer each time
            while (emitSchedule.size() > 0 || reader.ready()) {
    
                // insert all events into schedule that might be emitted next
                long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;
                long rideEventTime = ride != null ? getEventTime(ride) : -1;
                while(
                        ride != null && ( // while there is a ride AND
                                emitSchedule.isEmpty() || // and no ride in schedule OR
                                        rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule
                )
                {
                    // insert event into emit schedule
                    long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
                    emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
    
                    // read next ride
                    if (reader.ready() && (line = reader.readLine()) != null) {
                        ride = TaxiRide.fromString(line);
                        rideEventTime = getEventTime(ride);
                    }
                    else {
                        ride = null;
                        rideEventTime = -1;
                    }
                }
    
                // emit schedule is updated, emit next element in schedule
                Tuple2<Long, Object> head = emitSchedule.poll();
                long delayedEventTime = head.f0;
    
                long now = Calendar.getInstance().getTimeInMillis();
                long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
                long waitTime = servingTime - now;
    
                Thread.sleep( (waitTime > 0) ? waitTime : 0);
    
                if(head.f1 instanceof TaxiRide) {
                    TaxiRide emitRide = (TaxiRide)head.f1;
                    // emit ride
                    sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));
                }
                else if(head.f1 instanceof Watermark) {
                    Watermark emitWatermark = (Watermark)head.f1;
                    // emit watermark
                    sourceContext.emitWatermark(emitWatermark);
                    // schedule next watermark
                    long watermarkTime = delayedEventTime + watermarkDelayMSecs;
                    Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
                    emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
                }
            }
        }
        
       // 处理数据记录时间
        public long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
            long dataDiff = eventTime - dataStartTime;
            return servingStartTime + (dataDiff / this.servingSpeed);
        }
    
        public long getEventTime(TaxiRide ride) {
            return ride.getEventTime();
        }
    
        public long getNormalDelayMsecs(Random rand) {
            long delay = -1;
            long x = maxDelayMsecs / 2;
            while(delay < 0 || delay > maxDelayMsecs) {
                delay = (long)(rand.nextGaussian() * x) + x;
            }
            return delay;
        }
    
       // 操作source时 出现异常触发的操作
        @Override
        public void cancel() {
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                if (this.gzipStream != null) {
                    this.gzipStream.close();
                }
            } catch(IOException ioe) {
                throw new RuntimeException("Could not cancel SourceFunction", ioe);
            } finally {
                this.reader = null;
                this.gzipStream = null;
            }
        }
    }
    

    三.自定义sink

    主要实现redis作为sink,一般在实际的应用通过继承抽象类:RichSinkFunction
    1.public void open(Configuration parameters) throws Exception
    2.public void invoke(Tuple2<Long, Long> val, Context context) throws Exception
    3.public void close() throws Exception

     private transient JedisPool jedisPool;
    @Override
    public void open(Configuration parameters) throws Exception { // 新建redis pool 初始化
     try {
           super.open(parameters);
           JedisPoolConfig config = new JedisPoolConfig();
           config.setMaxIdle(redisConfig.getMaxIdle());
           config.setMinIdle(redisConfig.getMinIdle());
           config.setMaxTotal(redisConfig.getMaxTotal());
           jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
                    redisConfig.getConnectionTimeout(), redisConfig.getPassword(),  redisConfig.getDatabase());
       } catch (Exception e) {
            LOGGER.error("redis sink error {}", e);
      }
    }
    
     @Override
    public void close() throws Exception { // 关闭redis链接 使用完的资源释放
    try {
             jedisPool.close();
        } catch (Exception e) {
         LOGGER.error("redis sink error {}", e);
      }
    }
    
    @Override
    public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
       Jedis jedis = null;
      try {
           jedis = jedisPool.getResource();
           jedis.set("taxi:ride:" + val.f0,val.f1.toString());
     } catch (Exception e) {
         e.printStackTrace();
    } finally {
        if (null != jedis){
            if (jedis != null) {
             try {
                   jedis.close();
              } catch (Exception e) {
                    e.printStackTrace();
              }
         }
       }
     }
    }
    

    四.实例

    public class TaxiRideCount {
        /** logger */
        private static final Logger LOGGER = LoggerFactory.getLogger(TaxiRideCount.class);
        // main
        public static void main(String[] args) throws Exception {
            // 读取配置参数: 文件路径/最大延迟时间/
            final ParameterTool params = ParameterTool.fromArgs(args);
            String path = params.get("file-path","/home/wmm/go_bench/flink_sources/nycTaxiRides.gz");
            int maxDeply = params.getInt("max-delay",60);
            int servingSpeed = params.getInt("serving-speed",600);
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.getConfig().disableSysoutLogging();
    
            // 指定TaxiRide 读取source内容
            DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(path, maxDeply, servingSpeed));
           // 
            DataStream<Tuple2<Long,Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
                @Override
                public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
                    return new Tuple2<Long, Long>(ride.driverId, 1L); // 基于行程中的司机id划分数据 并进行统计
                }
            });
    
            KeyedStream<Tuple2<Long, Long>, Tuple> keyByDriverId = tuples.keyBy(0); // 基于司机id进行数据划分
            DataStream<Tuple2<Long, Long>> rideCounts = keyByDriverId.sum(1); // 累计每个司机的里程数
    
            RedisConfig redisConfig = new RedisConfig();
            redisConfig.setHost(params.get("output-redis","127.0.0.1"));
            redisConfig.setPort(6379);
            redisConfig.setPassword(null);
            //RedisSink redisSink = new RedisSink(redisConfig);
    // 本段是将redis进行了封装对应的实体类 便于复用
    //        rideCounts.map(new MapFunction<Tuple2<Long, Long>, RedisCommand>() { // 落地redis
    //            @Override
    //            public RedisCommand map(Tuple2<Long, Long> in) throws Exception {
    //                return new RedisPushCommand("taxi:ride:" + in.f0, Long.toString(in.f1));
    //                //return new RedisPushCommand("taxi:ride:" + in.f0, new String[]{Long.toString(in.f1)});
    //            }
    //        }).addSink(redisSink);
    
            // 直接使用匿名类实现redis sink
            rideCounts.addSink(new RichSinkFunction<Tuple2<Long, Long>>() {  // 定义sink
                private transient JedisPool jedisPool;
                @Override
                public void open(Configuration parameters) throws Exception { // 新建redis pool
                    try {
                        super.open(parameters);
                        JedisPoolConfig config = new JedisPoolConfig();
                        config.setMaxIdle(redisConfig.getMaxIdle());
                        config.setMinIdle(redisConfig.getMinIdle());
                        config.setMaxTotal(redisConfig.getMaxTotal());
                        jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
                                redisConfig.getConnectionTimeout(), redisConfig.getPassword(), redisConfig.getDatabase());
                    } catch (Exception e) {
                        LOGGER.error("redis sink error {}", e);
                    }
                }
    
                @Override
                public void close() throws Exception { // 关闭redis链接
                    try {
                        jedisPool.close();
                    } catch (Exception e) {
                        LOGGER.error("redis sink error {}", e);
                    }
                }
    
                @Override
                public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
                    Jedis jedis = null;
                    try {
                        jedis = jedisPool.getResource();
                        jedis.set("taxi:ride:" + val.f0,val.f1.toString()); // 直接存到redis中
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if (null != jedis){
                            if (jedis != null) {
                                try {
                                    jedis.close();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                }
            });
            //rideCounts.print();
    
            JobExecutionResult result = env.execute("Ride Count By DriverID");
         }
    }
    

    验证

    通过使用redis-client查看对应的内容:
    get taxi:ride:2013001713
    command count
    github提供完整实例

    相关文章

      网友评论

        本文标题:Apache Flink实例应用

        本文链接:https://www.haomeiwen.com/subject/twbnmqtx.html