美文网首页
Flink在用户行为分析中的应用(二)

Flink在用户行为分析中的应用(二)

作者: 冰菓_ | 来源:发表于2022-09-04 16:10 被阅读0次

    一. 项目背景

    业务系统一般有广告引擎,推荐引擎,搜索引擎,秒杀系统;当然实时智能运营系统也是业务系统的一部分,能为公司带来直接价值的.智能运营系统是一个用软件系统自动、实时监控用户的行为,并实时做出判断,并进而驱动营销推送的系统

    二. 需求分析

    营销规则举例
    1. 活跃度为A,男性,高星级用户,连续登陆7天的用户圈选
    2. 搜索过"商务休闲"关键字的用户,如果点击了"旅拍"坑位,则推送一张给定规则的优惠券
    3. 某用户,连续做过3次A行为后,没有做过[BC]行为序列的人群圈选
    4. 检测到某用户,在做过[AB]行为后,3分钟内,做过[C|D]行为,则触发监控告警
    5. 某高星级用户,规定时间内搜索的关键字,符合给定规则的特征,则推送app相关消息
    营销规则特征
    1. 营销规则有多样的属性
    2. 营销规则通常是有有效期设置
    3. 营销规则要根据业务的变化灵活的进行热管理,水平扩展
    4. 营销规则要圈选的人群可能不是一个静态的人群包
    营销规则的要素抽象
    1. 营销规则大体是对用户的画像和行为进行判断的
    2. 营销规则是有时间窗口限制的(规则上线前后)
    3. 营销规则的要素通常有
    1. 条件属性:就是各类逻辑的组合,与,或,非,大于,小于,等于,交集,并集
    2. 对比属性:事件和事件的对比关系,如AB事件的时间间隔应该大于N
    3. 时间属性:就是限制圈选的时间范围
    4. 画像属性:画像标签库的属性值,如年龄,性别,年代,星级
    5. 度量属性:如,登陆天数,消费金额,某行为事件发生M次,SUM(属性值),MAX(属性值)
    6. 序列属性:如用户连续触发[ABC]事件.后3s内触发D事件等

    三. 设计概要

    画像条件的设计概要
    1. 一般的画像标签是枚举值类型的,将数据存放在hbase中即可解决,但是对于一些例如兴趣爱好,出行意图往往不是枚举类型的,而是类似于文档,例如一个人爱好标签是"咖啡",而标签中可能存储了多种类型的"咖啡:抹茶咖啡:猫屎咖啡..",这涉及到了模糊查询,hbase很难支持,对于es这类文档数据库,就非常简单了,所以哦我们考虑使用es,来存储画像标签
    2. 显然大量的用户基数的查询,es也力不从心,考虑到画像标签的实时价值并不高,我们可以读取T+1的画像标签即可,我们考虑标签值进行bitmap编码的思路,编码后的数据广播与事件流进行逻辑判断
    行为条件的设计概要
    1. 行为明细数据存储的两难选择:doris有着强大的查询能力,但是面对高频词请求,复杂的查询需求,就无法满足我们极为严苛的响应延迟要求,如果将数据都保存在state中,由于大状态非常不容易维护,也是不可取的
    2. 我们无法笼统的解决这个问题,需要具体分析具体对待;如果只涉及上线前的历史行为分析判断,那么我们可以完全参照画像条件的思路,将统计结果形成固化的结果数据;如果只涉及上线后的数据,则通过state来计算,那么state如何进行行为分析呢?其实我们,在计算一个规则条件时,并不需要完整的明细数据,可以通过滚动聚合的方式来计算;如果横跨上线前后行为分析场景,就需要进行拆分和整合,我们考虑把要整合的数据存储在redis中,这是由于redis读写速度与flink内部的state不相上下,能满足我们的高并发低延迟读写要求并且redis中有丰富的数据结构够我们使用
    3. 行为序列的分析需要设计不同的状态结构,业务规则的需求是无限的,我们无法用同一套代码来应付灵活多变的规则条件计算逻辑,因此要考虑从业务编码中解放出来,我们考虑使用外部系统为规则去选用一个 “状态机”,然后注入到我们的规则引擎,我们考虑使用动态脚本语言来实现,如Groovy
    4. 综上所述,开发人员往规则管理平台添加groory模板数据,开发flink流程消费用户行为数据,并通过cdc获取mysql中的规则数据,连接进行各类规则的状态机运算

    四. 技术验证

    Groovy动态调用
    bitmap发布流程
    1. bitmap序列化 --> mysql元数据表 --> 反序列化的可行性
    public class RulePublisher {
        public static void main(String[] args) throws SQLException, IOException, ClassNotFoundException {
             String ruleId = "rule_0001";
             //根据规则,去es中查询人群
             int[] ruleProfileUsers = {1,3,5};
             //把查询出来的人群包,变成bitmap
            RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf(ruleProfileUsers);
             //把生成好的bitmap,序列到一个字节数组中
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            roaringBitmap.serialize(dataOutputStream);
            byte[] bitmapBytes = byteArrayOutputStream.toByteArray();
            //将这个bitmap连同本规则,一同发布到规则平台的元数据系统中
            //注意在mysql中的byte类型
            Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/stream_db?serverTimezone=Asia/Shanghai", "root", "123456");
            PreparedStatement prepareStatement = connection.prepareStatement("insert into rule_tf values (?,?)");
            prepareStatement.setString(1,ruleId);
            prepareStatement.setBytes(2,bitmapBytes);
            prepareStatement.execute();
            prepareStatement.close();
        }
    }
    
    public class BitmapFromMysqlBytes {
        public static void main(String[] args) throws SQLException, IOException {
            Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/stream_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
            PreparedStatement prepareStatement = connection.prepareStatement("select rule,profile_bitmap from rule_tf where rule =?");
            prepareStatement.setString(1,"rule_0001");
            ResultSet resultSet = prepareStatement.executeQuery();
    
    
            resultSet.next();
            byte[] profileBitmap = resultSet.getBytes("profile_bitmap");
            //反序列化bitmap
            RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
            roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
            //测试反序列化的数据是否为之前的数据
            System.out.println(roaringBitmap.contains(1));
            System.out.println(roaringBitmap.contains(3));
            System.out.println(roaringBitmap.contains(201));
            System.out.println(roaringBitmap.contains(202));
    
            prepareStatement.close();
            connection.close();
        }
    }
    
    flink-cdc规则获取流程
    1. 一个flinkcdc的demo
    public class FlinkCdcDemo {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
            //踩坑:请将flinktable中的 <scope>provided</scope>注释掉
            DebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder()
                    .hostname("localhost")
                    .port(3306)
                    .username("root")
                    .password("123456")
                    .databaseList("my_data")
                    .tableList("my_data.log_trail")
                    .startupOptions(StartupOptions.initial())
                    .deserializer(new StringDebeziumDeserializationSchema())
                    .build();
            //4.使用 CDC Source 从 MySQL 读取数据
            DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
            //5.打印数据
            mysqlDS.print();
            //6.执行任务
            env.execute();
        }
    }
    
    1. 使用flinksql-cdc获取mysql中的bitmap
    public class FlinkCdcBitmap {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            //2.x版本需要设置Checkpoint,否则只能读取全量数据,无法实时读取增量数据
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointStorage("file:/d/sun/");
            StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
            //创建cdc连接器
            tableEnv.executeSql("CREATE TABLE rule_tf (" +
                    " rule STRING PRIMARY KEY NOT ENFORCED, " +
                    " profile_bitmap BINARY " +
                    ") WITH (" +
                    " 'connector' = 'mysql-cdc', " +
                    " 'hostname' = 'localhost', " +
                    " 'port' = '3306', " +
                    " 'username' = 'root' ," +
                    " 'password' = '123456' ," +
                    " 'database-name' = 'my_data' ," +
                    " 'table-name' = 'rule_tf'" +
                    ")");
            // 踩坑,不支持使用CDC2.2版本,请使用CDC2.0,另外flink1.2不支持sql-cdc
            Table table = tableEnv.sqlQuery("select rule,profile_bitmap from rule_tf");
            tableEnv.toChangelogStream(table).print();
            SingleOutputStreamOperator<String> process = tableEnv.toChangelogStream(table).process(new ProcessFunction<Row, String>() {
                @Override
                public void processElement(Row value, ProcessFunction<Row, String>.Context ctx, Collector<String> out) throws Exception {
                    RowKind valueKind = value.getKind();
                    if (valueKind == RowKind.INSERT) {
                        String rule_id = (String) value.getField("rule");
                        byte[] profileBitmap = (byte[]) value.getField("profile_bitmap");
                        //反序列化bitmap
                        RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
                        assert profileBitmap != null;
                        roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
                        System.out.println(Arrays.toString(roaringBitmap.stream().toArray()));
                        out.collect(Arrays.toString(roaringBitmap.stream().toArray()));
                    }
                }
            });
            process.print();
            env.execute();
        }
    }
    
    1. cdc规则数据整合事件流
    
    public class FlinkInjectRuleBitmapProcessEvents {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointStorage("file:/d/sun/");
            StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
            //模拟一个事件流
            SingleOutputStreamOperator<Tuple2<String, String>> event = env.socketTextStream("192.168.10.100", 5666).map(new MapFunction<String, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(String value) throws Exception {
                    String[] split = value.split(",");
                    return Tuple2.of(split[0], split[1]);
                }
            });
    
    
            //创建cdc连接器
            tableEnv.executeSql("CREATE TABLE rule_tf (" +
                    " rule STRING PRIMARY KEY NOT ENFORCED, " +
                    " profile_bitmap BINARY " +
                    ") WITH (" +
                    " 'connector' = 'mysql-cdc', " +
                    " 'hostname' = 'localhost', " +
                    " 'port' = '3306', " +
                    " 'username' = 'root' ," +
                    " 'password' = '123456' ," +
                    " 'database-name' = 'my_data' ," +
                    " 'table-name' = 'rule_tf'" +
                    ")");
    
            Table table = tableEnv.sqlQuery("select rule,profile_bitmap from rule_tf");
            DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);
            SingleOutputStreamOperator<Tuple2<String, RoaringBitmap>> streamRule = changelogStream.map(new MapFunction<Row, Tuple2<String, RoaringBitmap>>() {
                @Override
                public Tuple2<String, RoaringBitmap> map(Row value) throws Exception {
                    String rule_id = (String) value.getField("rule");
                    byte[] profileBitmap = (byte[]) value.getField("profile_bitmap");
                    RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
                    roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
                    //标记获取规则的状态
                    if (RowKind.DELETE == value.getKind()) {
                        return Tuple2.of(rule_id+":"+"DELETE", roaringBitmap);
                    }
                    else {
                        //更新则覆盖
                        return Tuple2.of(rule_id+":"+"UPSERT", roaringBitmap);
                    }
                }
            });
            //进行广播
            MapStateDescriptor<String, RoaringBitmap> ruleStateDescriptor = new MapStateDescriptor<>("Rule", String.class, RoaringBitmap.class);
            BroadcastStream<Tuple2<String, RoaringBitmap>> broadcast = streamRule.broadcast(ruleStateDescriptor);
            //将广播流与行为事件进行连接
            event.keyBy(tp -> tp.f0)
                    .connect(broadcast)
                    .process(new KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>() {
                        @Override
                        public void processElement(Tuple2<String, String> event, KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
                            ReadOnlyBroadcastState<String, RoaringBitmap> readOnlyContextBroadcastState = readOnlyContext.getBroadcastState(ruleStateDescriptor);
                            //对系统中的每个规则,进行一次人群判断
                            for (Map.Entry<String, RoaringBitmap> entry : readOnlyContextBroadcastState.immutableEntries()) {
                                String rule_id = entry.getKey();
                                RoaringBitmap profileBitmap = entry.getValue();
                                collector.collect(String.format("当前行为事件用户 %d 规则 %s的目标人群是否包含此人 %s",Integer.parseInt(event.f0),rule_id,profileBitmap.contains(Integer.parseInt(event.f0))));
                            }
                        }
    
                        @Override
                        public void processBroadcastElement(Tuple2<String, RoaringBitmap> ruleInfo, KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>.Context context, Collector<String> collector) throws Exception {
                            BroadcastState<String, RoaringBitmap> broadcastState = context.getBroadcastState(ruleStateDescriptor);
                            //形如 rule:DELETE的拼接
                            String[] split = ruleInfo.f0.split(":");
                            if ("DELETE".equals(split[1])) {
                                System.out.println("删除一条规则" +split[0] +"  " + ruleInfo.f0 );
                                broadcastState.remove(split[0]);
                            }
                            else {
                                System.out.println("新增一条规则" +split[0] +"  " + ruleInfo.f0);
                                broadcastState.put(split[0], ruleInfo.f1);
                            }
                        }
                    }).print();
    
    
            env.execute();
        }
    }
    
    

    五. 数据开发

    通过Spark将用户画像数据导入到ES中
    搜索人群画像形成bitmap
    规则模板的数据结构
    动态条件表达式的设计

    规则(前端注入)可能存在多个,我们不能写死,同时规则之间的关系也是复杂的,我们考虑使用动态脚本语言模板来实现

    状态机

    状态机就是一段逻辑的封装,规则不能写死,同理逻辑也不能写死,我们同样考虑使用动态脚本语言模板来实现,在本篇中状态机即是动态条件表达式与Redis运算逻辑的结合体

    触发型&非触发型规则运算
    序列型规则运算

    使用redis中的顶层key结构,value是某个人匹配到最大步骤数,新增一个结构来保留匹配到序列的次数

    public void caclActionSeq(UserEvent userEvent){
      int guid = userEvent.getGuid()
      String redisSeqStepKey = ruleId + ":" + actionSeqConditionId + ":step";
      String redisSeqCntKey = ruleId + ":" + actionSeqConditionId + ":cnt";
    //从redis中获取该用户,本规则的行为序列,待完成序列的,已到达的,步骤数
      String preStepStr = jedis.hget (redisSeqKey, guid + "")
      int preStep = preStepStr == null? 0: Integer.parseInt (preStepStr)
    //判断本次输入的事件,是否是行为序列参数期待的下一个条件
    if (preStep<eventParams.size()){
    JSONObject eventParam = eventParams.getJSONObject (preStep)
    //如果输入事件,正是行为序列参数期待的下一个事件
    if(UserEventComparator.userEventIsEqualParam(userEvent,eventParam)){
    
    if(preStep== eventParams.size ()-1){
    //将redis中的步骤号重置为0   
    jedis.hset(redisSeStepKey,guid+"," ,  "0")
    // 将redis中该用户条件的完成次数+1
    jedis.hincrBy(redisSeqCntKey,guid+** .  1)
    }
    else{
    //否则步骤数+1
    long by = jedis.hincrBy (redisSeqStepKey, guid + "" +value: 1)
       }
      }
     }
    }
    
    定时型规则运算
    新规则模型开发

    六. 遗留问题

    1. redis中存用某个用户的逻辑数据,如果不做清除,下一次触发又是该用户的情形,会造成类似薅羊毛的事故,由于我们已经对Redis设置为按时间来清除过期数据,为了避免这种情况的发生,我们可以考虑增加一个最大投放次数的状态
    2. [触发型&非触发型规则运算],此处完全可以交给Groovy
    3. 如何处理新增的规则模型
    4. 某个规则圈选的人群,达到千万级别,经过测试,一个达到1亿级别的数据的bitmap,体积大约12M 放入mysql完全没问题
    5. 关于redis中的数据容量的问题?我们的一个规则的一个条件状态记录hash中,也就十万人级别(10万+元素),很轻松的,而且,如果redis负载能力不够的时候,可以部署redis集群的
    6. 如何兼容定时型规则?可以考虑新增一个是否是定时型规则的状态,但是存在的问题是使得项目变得复杂不好维护
    7. 性能测试的方法和描述,一般而言,一个subtask在1s内,能处理4000次行为次数条件的判断运算

    相关文章

      网友评论

          本文标题:Flink在用户行为分析中的应用(二)

          本文链接:https://www.haomeiwen.com/subject/kiavnrtx.html