美文网首页
Flink 生产实践

Flink 生产实践

作者: 一生逍遥一生 | 来源:发表于2021-09-27 11:53 被阅读0次

    Flink HA

    Flink HA 的HighAvailabilityMode类中定义了是那种高可用性模式枚举:

    • NONE:非HA模式
    • ZOOKEEPER:基于ZK实现HA
    • FACTORY_CLASS:自定义HA工厂类,实现HighAvailabilityServiceFactory接口。

    ZooKeeperHaService主要提供了创建LeaderRetrievalService和LeaderElectionService等方法,并给出了各个服务组件使用的ZK节点名称。

    Flink Exactly-once实现原理解析

    流处理引擎通常为用户的应用程序提供是那种数据处理语义:最多一次、至少一次、精确一次。

    • 最多一次: 用户数据只会被处理一次,不管成功还是失败,不会重试也不会重发。
    • 至少一次: 系统会保证数据或事件被处理一次。如果中间发生错误或者丢失,就会重发或者重试。
    • 精确一次: 每一条数据只会被精确地处理一次,不多也不少。

    Flink的快照可以到算子级别,并且对全局数据也可以做快照。
    Flink分布式快照的核心元素之一是Barrier,该标记是严格有序的,并随着数据往下流动。
    每个流的barrier n到达时间不一致怎么办,这是Flink采取的措施是快流等慢流。
    Flink在做存储时,可采用异步方式,每次都是进行的全量checkpoint,是基于上次进行更新的。

    快照机制能够保证作业出现fail-over后可以从最新的快照进行恢复,即分布式快照机制可以保证Flink系统内部的精确一次处理。

    两阶段处理继承TwoPhaseCommitSinkFunction,需要实现beginTransaction、preCommit、commit、abort方法来实现精确一次的处理语义,

    • beginTransaction:在开启事务之前,在目标文件系统的临时目录中创建一个临时文件,后面在处理数据时将数据写入此文件。
    • preCommit:在预提交阶段,刷写文件,然后关闭文件,之后就不能写入到文件,为属于下一个检查点的任何后续写入启动新事务。
    • commit:在提交阶段,将预提交的文件原子性移动到真正的目标目录中,这会增加输出数据可见性的延迟。
    • abort:在终止阶段,删除临时文件。

    Kafka-Flink-Kafka过程:

    • Flink开始做checkpoint操作, 进入pre-commit阶段,同时Flink JobManager会将检查点Barrier注入数据流中。
    • 当所有barrier在算子中成功进行一遍传递,并完成快照后,则pre-commit阶段完成
    • 等所有的算子完成预提交,就会发起一个提交动作,但是任何一个预提交失败都会导致Flink回滚到最近的checkpoint;
    • pre-commit完成,必须要确保commit也要成功。

    如何排查生产环境中的反压问题

    不同框架的反压对比:

    • Storm:从1.0版本之后引入反压,Storm会主动监控工作节点,工作节点接收数据超过阈值,反压信息会被发送到ZooKeeper,ZooKeeper通知所有的工作节点
      进入反压状态,最后数据的生产源头会降低数据的发送速度。
    • Spark Streaming:RateController组件,利用经典的PID算法,根据消息数量、调度时间、处理时间等计算出来速率,然后进行限速。
    • Flink:利用网络传输和动态限流,流中的数据在算子间进行计算和转换时,会被放入分布式的阻塞队列中。当消费者的阻塞队列满时,则会降低生产者的数据生产速度。

    Flink Web UI Back Pressure出现数值:

    • OK: 0<=Ratio<=0.10,正常;
    • LOW:0.10<Ratio<=0.50,一般;
    • HIGH: 0.5 < Ratio <=1,严重。
    指标名称 用途 解释
    outPoolUsage 发送端缓冲池的使用率 当前Task的数据发送率,如果数值很低,当前节点有可能为反压节点
    inPoolUsage 接收端缓冲池的使用率 Task的接收速度,inPoolUsage很高,outPoolUsage很低,这个节点有可能是反压节点
    floatingBuffersUsage 处理节点缓冲池的使用率
    exclusiveBuffersUsage 数据输入方缓冲池的使用率

    反压问题处理:

    • 数据倾斜:使用类似的KeyBy等分组聚合函数导致,需要用户将热点key进行预处理,降低或者消除热点key的影响。
    • GC:使用-XX:+PrintGCDetails参数查看GC日志
    • 代码本身:查看机器的CPU、内存使用

    如何处理生产环境中的数据倾斜问题

    两阶段聚合解决KeyBy热点

    根据type进行KeyBy时,如果数据的type分布不均匀就会导致大量的数据分配到一个task中,发生数据倾斜。解决的思路为:

    • 首先把分组的key打散,比如添加随机后缀;
    • 对打散后的数据进行聚合;
    • 将打散的key还原为原先的key
    • 二次KeyBy进行结果统计,然后输出。

    Flink消费Kafka数据时,要保证Kafka的分区数等于Flink Consumer的并行度。如果不一致,需要设置Flink的Redistributing(数据充分配),
    Rebalance分区策略,数据会以round-robin的方式对数据进行再次分区,可以全局负载均衡。
    Rescale分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中。

    生产环境中的并行度和资源设置

    在Flink集群中,一个TaskManager就是一个JVM进程,并且会用独立的线程来执行task,slot仅仅用来做内存的隔离,对CPU不起作用。

    默认情况下,Flink还允许同一个Job的子任务共享slot。

    Flink自身会把不同的算子的task连接在一起组成一个新的task。因为task在同一个线程中执行,可以有效减少线程间上下文的切换,减少序列化/反序列化带来的资源消耗,
    提高任务的吞吐量。

    并行度级别:算子级别、环境级别、客户端级别、集群配置级别。

    在生产中,推荐在算子级别显式指定各自的并行度,方便进行显式和精确的资源控制。
    环境级别:任务中的所有算子的并行度都是指定的值,生产环境不推荐。

    设置并行度的优先级为:算子级别 > 环境级别 > 客户端级别 > 集群级别配置。

    Flink如何做维表关联

    业务对维表数据关联的时效性要求,有以下几种解决方案:

    • 实时查询维表:用户在Flink算子中直接访问外部数据库,这种是同步方式,数据保证是最新的。
    • 预加载全量数据:每次启动时,将维表中全部数据加载到内存中。
    • LRU缓存:将最近最少使用的数据则被淘汰。

    实时查询维表

    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.configuration.Configuration;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    
    public class DimSync extends RichMapFunction<String,Order> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
    
        private Connection conn = null;
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
        }
    
        public Order map(String in) throws Exception {
    
            JSONObject jsonObject = JSONObject.parseObject(in);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
    
            //根据city_id 查询 city_name
            PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
            pst.setInt(1,cityId);
            ResultSet resultSet = pst.executeQuery();
            String cityName = null;
            while (resultSet.next()){
                cityName = resultSet.getString(1);
            }
            pst.close();
            return new Order(cityId,userName,items,cityName);
        }
    
        public void close() throws Exception {
            super.close();
            conn.close();
        }
    
    }
    

    要保证及时关闭连接池

    public class Order {
        private Integer cityId;
        private String userName;
        private String items;
        private String cityName;
    
        public Order(Integer cityId, String userName, String items, String cityName) {
            this.cityId = cityId;
            this.userName = userName;
            this.items = items;
            this.cityName = cityName;
        }
    
        public Order() {
        }
    
        public Integer getCityId() {
            return cityId;
        }
    
        public void setCityId(Integer cityId) {
            this.cityId = cityId;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
        public String getItems() {
            return items;
        }
    
        public void setItems(String items) {
            this.items = items;
        }
    
        public String getCityName() {
            return cityName;
        }
    
        public void setCityName(String cityName) {
            this.cityName = cityName;
        }
    
        @Override
        public String toString() {
            return "Order{" +
                    "cityId=" + cityId +
                    ", userName='" + userName + '\'' +
                    ", items='" + items + '\'' +
                    ", cityName='" + cityName + '\'' +
                    '}';
        }
    }
    

    预加载全量数据

    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.configuration.Configuration;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.*;
    import java.util.Map;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class WholeLoad extends RichMapFunction<String,Order> {
    
    
        private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
        ScheduledExecutorService executor = null;
        private Map<String,String> cache;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            executor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        load();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },5,5, TimeUnit.MINUTES);
        }
    
        @Override
        public Order map(String value) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(value);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            String cityName = cache.get(cityId);
            return new Order(cityId,userName,items,cityName);
        }
    
        public void load() throws Exception {
            Class.forName("com.mysql.jdbc.Driver");
            Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
            PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
            ResultSet rs = statement.executeQuery();
            //全量更新维度数据到内存
            while (rs.next()) {
                String cityId = rs.getString("city_id");
                String cityName = rs.getString("city_name");
                cache.put(cityId, cityName);
            }
            con.close();
        }
    }
    

    LRU缓存

    import com.alibaba.fastjson.JSONObject;
    import com.stumbleupon.async.Callback;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.hbase.async.GetRequest;
    import org.hbase.async.HBaseClient;
    import org.hbase.async.KeyValue;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    public class LRU extends RichAsyncFunction<String,Order> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
        String table = "info";
        Cache<String, String> cache = null;
        private HBaseClient client = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //创建hbase客户端
            client = new HBaseClient("127.0.0.1","7071");
            cache = CacheBuilder.newBuilder()
                    //最多存储10000条
                    .maximumSize(10000)
                    //过期时间为1分钟
                    .expireAfterWrite(60, TimeUnit.SECONDS)
                    .build();
        }
    
        @Override
        public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
    
            JSONObject jsonObject = JSONObject.parseObject(input);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            //读缓存
            String cacheCityName = cache.getIfPresent(cityId);
            //如果缓存获取失败再从hbase获取维度数据
            if(cacheCityName != null){
                Order order = new Order();
                order.setCityId(cityId);
                order.setItems(items);
                order.setUserName(userName);
                order.setCityName(cacheCityName);
                resultFuture.complete(Collections.singleton(order));
            }else {
    
                client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                    for (KeyValue kv : arg) {
                        String value = new String(kv.value());
                        Order order = new Order();
                        order.setCityId(cityId);
                        order.setItems(items);
                        order.setUserName(userName);
                        order.setCityName(value);
                        resultFuture.complete(Collections.singleton(order));
                        cache.put(String.valueOf(cityId), value);
                    }
                    return null;
                });
    
            }
        }
    }
    

    海量数据去重

    Flink中实时去重的方案:

    • 基于状态后端
    • 基于HyperLogLog
    • 基于布隆过滤器
    • 基于BitMap
    • 基于外部数据库

    基于状态后端

    状态后端的种类之一是RocksDBStateBackend,它会将正在云心中的状态数据保存在RockDB数据库中,该数据库默认将数据存储在TaskManager运行节点的数据目录下。
    计算每天每个商品的访问量:

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
    
        private transient ValueState<Integer> counts;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            //我们设置ValueState的TTL的生命周期为24小时,到期自动清除状态
            StateTtlConfig ttlConfig = StateTtlConfig
                    .newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
    
            //设置ValueState的默认值
            ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
            descriptor.enableTimeToLive(ttlConfig);
            counts = getRuntimeContext().getState(descriptor);
            super.open(parameters);
        }
    
    
        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
    
            String f0 = value.f0;
    
            //如果不存在则新增
            if(counts.value() == null){
                counts.update(1);
            }else{
                //如果存在则加1
                counts.update(counts.value()+1);
            }
    
            out.collect(Tuple2.of(f0, counts.value()));
    
        }
    
    }
    

    基于HyperLogLo

    HyperLogLog是一种估计统计算法,被用来统计一饿集合中不同数据的个数。

    import net.agkn.hll.HLL;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    
    public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
    
    
        @Override
        public HLL createAccumulator() {
    
            return new HLL(14, 5);
        }
    
        @Override
        public HLL add(Tuple2<String, Long> value, HLL accumulator) {
    
            //value为购买记录 <商品sku, 用户id>
            accumulator.addRaw(value.f1);
            return accumulator;
        }
    
        @Override
        public Long getResult(HLL accumulator) {
            long cardinality = accumulator.cardinality();
            return cardinality;
        }
    
    
        @Override
        public HLL merge(HLL a, HLL b) {
            a.union(b);
            return a;
        }
    }
    

    添加相应的pom依赖:

    <dependency>
        <groupId>net.agkn</groupId>
        <artifactId>hll</artifactId>
        <version>1.6.0</version>
    </dependency>
    

    如果元素是非数值型,需要hash过后才能插入。

    基于布隆过滤器

    BloomFilter类似于一个HashSet,用于快速判断某个元素是否存在与集合中,其典型的应用场景就是能够快速判断一个key是否存在某个容器中,不存在直接返回。

    import com.google.common.hash.BloomFilter;
    import com.google.common.hash.Funnels;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
    
        private transient ValueState<BloomFilter> bloomState;
        private transient ValueState<Long> countState;
    
    
        @Override
        public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
    
            BloomFilter bloomFilter = bloomState.value();
            Long skuCount = countState.value();
    
            if(bloomFilter == null){
                BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
            }
    
            if(skuCount == null){
                skuCount = 0L;
            }
    
            if(!bloomFilter.mightContain(value)){
                bloomFilter.put(value);
                skuCount = skuCount + 1;
            }
    
            bloomState.update(bloomFilter);
            countState.update(skuCount);
            out.collect(countState.value());
        }
    }
    

    BitMap

    HyperLogLog 和BloomFilter虽然减少了存储但是丢失了精度。
    BitMap的基本思想是用一个bit位来标记某个元素对应的value,而key即是该元素。

    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.roaringbitmap.longlong.Roaring64NavigableMap;
    
    public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
    
    
        @Override
        public Roaring64NavigableMap createAccumulator() {
            return new Roaring64NavigableMap();
        }
    
        @Override
        public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
            accumulator.add(value);
            return accumulator;
        }
    
    
        @Override
        public Long getResult(Roaring64NavigableMap accumulator) {
            return accumulator.getLongCardinality();
        }
    
        @Override
        public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
            return null;
        }
    }
    

    添加依赖:

    <dependency>
        <groupId>org.roaringbitmap</groupId>
        <artifactId>RoaringBitmap</artifactId>
        <version>0.9.21</version>
    </dependency>
    

    相关文章

      网友评论

          本文标题:Flink 生产实践

          本文链接:https://www.haomeiwen.com/subject/swmhnltx.html