美文网首页程序员
Flink基于RoaringBitmap的精确去重方案

Flink基于RoaringBitmap的精确去重方案

作者: 小胡子哥灬 | 来源:发表于2020-06-16 15:06 被阅读0次

在Flink实时统计中,提到去重,我能想到比较流行的几种方式:

  1. 布隆过滤器 - 非精确去重,精度可以配置,但精度越高,需要的开销就越大。主流的框架可以使用guava的实现,或者借助于redis的bit来自己实现,hash算法可以照搬guava的。
  2. HyperLoglog - 基于基数的非精确去重,优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
  3. BitMap - 优点是精确去重,占用空间小(在数据相对均匀的情况下)。缺点是只能用于数字类型(int或者long)。

本文主要讲述Flink基于RoaringBitmap的去重方案,首先引入依赖:

<dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>0.8.13</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.11.6</version>
</dependency>

构建BitIndex

BitMap固然好用,但是对去重的字段只能用int或者long类型;但是如果去重字段不是int或者long怎么办呢?那我们就构建一个字段与BitIndex的映射关系表,bitIndex从1开始递增。比如{a = 1, b = 2, c = 3};使用的时候先从映射表里根据字段取出对应的bitindex,如果没有,则全局生成一个。这里我用redis来作为映射表。具体实现我放在一个MapFunction里,如下:

public class BitIndexBuilderMap extends RichMapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>> {

  private static final Logger LOG = LoggerFactory.getLogger(BitIndexBuilderMap.class);

  private static final String GLOBAL_COUNTER_KEY = "FLINK:GLOBAL:BITINDEX";

  private static final String GLOBAL_COUNTER_LOCKER_KEY = "FLINK:GLOBAL:BITINDEX:LOCK";

  private static final String USER_BITINDEX_SHARDING_KEY = "FLINK:BITINDEX:SHARDING:";

  /**
   * 把用户id分散到redis的100个map中,防止单个map的无限扩大,也能够充分利用redis cluster的分片功能
   */
  private static final Integer REDIS_CLUSTER_SHARDING_MODE = 100;

  private HashFunction hash = Hashing.crc32();

  private RedissonClient redissonClient;

  @Override
  public void open(Configuration parameters) throws Exception {
//    ParameterTool globalPara = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    Config config = new Config();
    config.setCodec(new StringCodec());
    config.useClusterServers().addNodeAddress(getRedissonNodes("redis1:8080,redis2:8080,redis3:8080"))
        .setPassword("xxxx").setSlaveConnectionMinimumIdleSize(1)
        .setMasterConnectionPoolSize(2)
        .setMasterConnectionMinimumIdleSize(1)
        .setSlaveConnectionPoolSize(2)
        .setSlaveConnectionMinimumIdleSize(1)
        .setConnectTimeout(10000)
        .setTimeout(10000)
        .setIdleConnectionTimeout(10000);
    redissonClient = Redisson.create(config);
  }

  /**
   * 把userId递增化,在redis中建立一个id映射关系
   * @param in
   * @return
   * @throws Exception
   */
  @Override
  public Tuple3<String, String, Integer> map(Tuple2<String, String> in) throws Exception {
    String userId = in.f0;
    //分片
    int shardingNum = Math.abs(hash.hashBytes(userId.getBytes()).asInt()) % REDIS_CLUSTER_SHARDING_MODE;
    String mapKey = USER_BITINDEX_SHARDING_KEY + shardingNum;
    RMap<String, String> rMap = redissonClient.getMap(mapKey);
    // 如果为空,生成一个bitIndex
    String bitIndexStr = rMap.get(userId);
    if(StringUtils.isEmpty(bitIndexStr)) {
      LOG.info("userId[{}]的bitIndex为空, 开始生成bitIndex", userId);
      RLock lock = redissonClient.getLock(GLOBAL_COUNTER_LOCKER_KEY);
      try{
        lock.tryLock(60, TimeUnit.SECONDS);
        // 再get一次
        bitIndexStr = rMap.get(userId);
        if(StringUtils.isEmpty(bitIndexStr)) {
          RAtomicLong atomic = redissonClient.getAtomicLong(GLOBAL_COUNTER_KEY);
          bitIndexStr = String.valueOf(atomic.incrementAndGet());
        }
        rMap.put(userId, bitIndexStr);
      }finally{
        lock.unlock();
      }
      LOG.info("userId[{}]的bitIndex生成结束, bitIndex: {}", userId, bitIndexStr);
    }
    return new Tuple3<>(in.f0, in.f1, Integer.valueOf(bitIndexStr));
  }

  @Override
  public void close() throws Exception {
    if(redissonClient != null) {
      redissonClient.shutdown();
    }
  }

  private String[] getRedissonNodes(String hosts) {
    List<String> nodes = new ArrayList<>();
    if (hosts == null || hosts.isEmpty()) {
      return null;
    }
    String nodexPrefix = "redis://";
    String[] arr = StringUtils.split(hosts, ",");
    for (String host : arr) {
      nodes.add(nodexPrefix + host);
    }
    return nodes.toArray(new String[nodes.size()]);
  }
}

去重逻辑

通过MapFunction拿到字段对应的BitIndex之后,就可以直接进行去重逻辑了,比如我要统计某个页面下的访问人数:

public class CountDistinctFunction extends KeyedProcessFunction<Tuple, Tuple3<String, String, Integer>, Tuple2<String, Long>> {

  private static final Logger LOG = LoggerFactory.getLogger(CountDistinctFunction.class);

  private ValueState<Tuple2<RoaringBitmap, Long>> state;

  @Override
  public void open(Configuration parameters) throws Exception {
    state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Types.TUPLE(Types.GENERIC(RoaringBitmap.class), Types.LONG)));
  }

  @Override
  public void processElement(Tuple3<String, String, Integer> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    // retrieve the current count
    Tuple2<RoaringBitmap, Long> current = state.value();
    if (current == null) {
      current = new Tuple2<>();
      current.f0 = new RoaringBitmap();
    }
    current.f0.add(in.f2);

    long processingTime = ctx.timerService().currentProcessingTime();
    if(current.f1 == null || current.f1 + 10000 <= processingTime) {
      current.f1 = processingTime;
      // write the state back
      state.update(current);
      ctx.timerService().registerProcessingTimeTimer(current.f1 + 10000);
    } else {
      state.update(current);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
    Tuple2<RoaringBitmap, Long> result = state.value();

    result.f0.runOptimize();
    out.collect(new Tuple2<>(key.f0, result.f0.getLongCardinality()));
  }
}

主程序的主要代码:

env.addSource(source).map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] arr = StringUtils.split(value, ",");
                return new Tuple2<>(arr[0], arr[1]);
            }
        })
            .keyBy(0) //根据userId分组
            .map(new BitIndexBuilderMap()) //构建bitindex
            .keyBy(1) //统计页面下的访问人数
            .process(new CountDistinctFunction())
            .print();

测试数据:

shizc,www.baidu..com
shizc,www.baidu.com
shizc1,www.baidu.com
shizc2,www.baidu.com
shizc,www.baidu..com
shizc,www.baidu..com
shizc,www.baidu..com
shizc,www.hahaha.com
shizc,www.hahaha.com
shizc1,www.hahaha.com
shizc2,www.hahaha.com

输出 :
(www.baidu.com,4)
(www.hahaha.com,3)

总结

  1. 如果你的数据字段已经是数字类型时,可以不用构建BitIndex,但是要确保你的字段是有规律,而且递增,如果是long类型还可以用Roaring64NavigableMap,但如果是雪化算法生成的id,最好不要用,因为不能压缩,占用空间非常大,笔者之前就是直接用Roaring64NavigableMap,1000多万个id就达到了700多M。

  2. 以上实现在数据量特别大的时候,在生成bitindex的时候会有性能的瓶颈,所以我们应该预先构建BitIndex,也就是把你的数据库当中的所有用户id,预先用flink批处理任务,生成映射。基本代码如下:

// main方法
    final ExecutionEnvironment env = buildExecutionEnv();
   //如果没有找到好的方法保证id单调递增,就设置一个并行度
    env.setParallelism(1);

    TextInputFormat input = new TextInputFormat(new Path(MEMBER_RIGHTS_HISTORY_PATH));
    input.setCharsetName("UTF-8");
    DataSet<String> source =  env.createInput(input).filter(e -> !e.startsWith("user_id")).map(
        new MapFunction<String, String>() {
          @Override
          public String map(String value) throws Exception {
            String[] arr = StringUtils.split(value, ",");
            return arr[0];
          }
        })
        .distinct();
    source
        .map(new RedisMapBuilderFunction())
        .groupBy(0)
        .reduce(new RedisMapBuilderReduce())
        .output(new RedissonOutputFormat());

    long counter = source.count();
    env.fromElements(counter).map(new MapFunction<Long, Tuple3<String, String, Object>>() {
      @Override
      public Tuple3<String, String, Object> map(Long value) throws Exception {
        return new Tuple3<>("FLINK:GLOBAL:BITINDEX", "ATOMICLONG", value);
      }
    }).output(new RedissonOutputFormat());

// 注意分区逻辑和key要和stream的保持一致
public class RedisMapBuilderFunction implements MapFunction<String, Tuple3<String, String, Object>> {

  private static final String USER_BITINDEX_SHARDING_KEY = "FLINK:BITINDEX:SHARDING:";

  private static final Integer REDIS_CLUSTER_SHARDING_MODE = 100;

  private HashFunction hash = Hashing.crc32();
  private Integer counter = 0;

  @Override
  public Tuple3<String, String, Object> map(String userId) throws Exception {
    counter ++;
    int shardingNum = Math.abs(hash.hashBytes(userId.getBytes()).asInt()) % REDIS_CLUSTER_SHARDING_MODE;
    String key = USER_BITINDEX_SHARDING_KEY + shardingNum;
    Map<String, String> map = new HashMap<>();
    map.put(userId, String.valueOf(counter));
    return new Tuple3<>(key, "MAP", map);
  }
}

public class RedisMapBuilderReduce implements ReduceFunction<Tuple3<String, String, Object>> {
  @Override
  public Tuple3<String, String, Object> reduce(Tuple3<String, String, Object> value1, Tuple3<String, String, Object> value2) throws Exception {
    Map<String, String> map1 = (Map<String, String>) value1.f2;
    Map<String, String> map2 = (Map<String, String>) value2.f2;
    map1.putAll(map2);
    return new Tuple3<>(value1.f0, value1.f1, map1);
  }
}

//输出 到redis
public class RedissonOutputFormat extends RichOutputFormat<Tuple3<String, String, Object>> {
  
  private RedissonClient redissonClient;

  @Override
  public void configure(Configuration parameters) {

  }

  @Override
  public void open(int taskNumber, int numTasks) throws IOException {
    Config config = new Config();
    config.setCodec(new StringCodec());
    config.useClusterServers().addNodeAddress(getRedissonNodes("redis1:8080,redis2:8080,redis3:8080"))
        .setPassword("xxx").setSlaveConnectionMinimumIdleSize(1)
        .setMasterConnectionPoolSize(2)
        .setMasterConnectionMinimumIdleSize(1)
        .setSlaveConnectionPoolSize(2)
        .setSlaveConnectionMinimumIdleSize(1)
        .setConnectTimeout(10000)
        .setTimeout(10000)
        .setIdleConnectionTimeout(10000);
    redissonClient = Redisson.create(config);
  }

  /**
   * k,type,value
   * @param record
   * @throws IOException
   */
  @Override
  public void writeRecord(Tuple3<String, String, Object> record) throws IOException {
    String key = record.f0;
    RKeys rKeys = redissonClient.getKeys();
    rKeys.delete(key);
    String keyType = record.f1;
    if("STRING".equalsIgnoreCase(keyType)) {
      String value = (String) record.f2;
      RBucket<String> rBucket = redissonClient.getBucket(key);
      rBucket.set(value);
    } else if("MAP".equalsIgnoreCase(keyType)) {
      Map<String, String> map = (Map<String, String>) record.f2;
      RMap<String, String> rMap = redissonClient.getMap(key);
      rMap.putAll(map);
    } else if("ATOMICLONG".equalsIgnoreCase(keyType)) {
      long l = (long) record.f2;
      RAtomicLong atomic = redissonClient.getAtomicLong(key);
      atomic.set(l);
    }
  }

  @Override
  public void close() throws IOException {
    if(redissonClient != null) {
      redissonClient.shutdown();
    }
  }

  private String[] getRedissonNodes(String hosts) {
    List<String> nodes = new ArrayList<>();
    if (hosts == null || hosts.isEmpty()) {
      return null;
    }
    String nodexPrefix = "redis://";
    String[] arr = StringUtils.split(hosts, ",");
    for (String host : arr) {
      nodes.add(nodexPrefix + host);
    }
    return nodes.toArray(new String[nodes.size()]);
  }
}

相关文章

网友评论

    本文标题:Flink基于RoaringBitmap的精确去重方案

    本文链接:https://www.haomeiwen.com/subject/ggvhxktx.html