一. 项目背景
业务系统一般有广告引擎,推荐引擎,搜索引擎,秒杀系统;当然实时智能运营系统也是业务系统的一部分,能为公司带来直接价值的.智能运营系统是一个用软件系统自动、实时监控用户的行为,并实时做出判断,并进而驱动营销推送的系统
二. 需求分析
营销规则举例
- 活跃度为A,男性,高星级用户,连续登陆7天的用户圈选
- 搜索过"商务休闲"关键字的用户,如果点击了"旅拍"坑位,则推送一张给定规则的优惠券
- 某用户,连续做过3次A行为后,没有做过[BC]行为序列的人群圈选
- 检测到某用户,在做过[AB]行为后,3分钟内,做过[C|D]行为,则触发监控告警
- 某高星级用户,规定时间内搜索的关键字,符合给定规则的特征,则推送app相关消息
营销规则特征
- 营销规则有多样的属性
- 营销规则通常是有有效期设置
- 营销规则要根据业务的变化灵活的进行热管理,水平扩展
- 营销规则要圈选的人群可能不是一个静态的人群包
营销规则的要素抽象
- 营销规则大体是对用户的画像和行为进行判断的
- 营销规则是有时间窗口限制的(规则上线前后)
- 营销规则的要素通常有
- 条件属性:就是各类逻辑的组合,与,或,非,大于,小于,等于,交集,并集
- 对比属性:事件和事件的对比关系,如AB事件的时间间隔应该大于N
- 时间属性:就是限制圈选的时间范围
- 画像属性:画像标签库的属性值,如年龄,性别,年代,星级
- 度量属性:如,登陆天数,消费金额,某行为事件发生M次,SUM(属性值),MAX(属性值)
- 序列属性:如用户连续触发[ABC]事件.后3s内触发D事件等
三. 设计概要
画像条件的设计概要
- 一般的画像标签是枚举值类型的,将数据存放在hbase中即可解决,但是对于一些例如兴趣爱好,出行意图往往不是枚举类型的,而是类似于文档,例如一个人爱好标签是"咖啡",而标签中可能存储了多种类型的"咖啡:抹茶咖啡:猫屎咖啡..",这涉及到了模糊查询,hbase很难支持,对于es这类文档数据库,就非常简单了,所以哦我们考虑使用es,来存储画像标签
- 显然大量的用户基数的查询,es也力不从心,考虑到画像标签的实时价值并不高,我们可以读取T+1的画像标签即可,我们考虑标签值进行bitmap编码的思路,编码后的数据广播与事件流进行逻辑判断
行为条件的设计概要
- 行为明细数据存储的两难选择:doris有着强大的查询能力,但是面对高频词请求,复杂的查询需求,就无法满足我们极为严苛的响应延迟要求,如果将数据都保存在state中,由于大状态非常不容易维护,也是不可取的
- 我们无法笼统的解决这个问题,需要具体分析具体对待;如果只涉及上线前的历史行为分析判断,那么我们可以完全参照画像条件的思路,将统计结果形成固化的结果数据;如果只涉及上线后的数据,则通过state来计算,那么state如何进行行为分析呢?其实我们,在计算一个规则条件时,并不需要完整的明细数据,可以通过滚动聚合的方式来计算;如果横跨上线前后行为分析场景,就需要进行拆分和整合,我们考虑把要整合的数据存储在redis中,这是由于redis读写速度与flink内部的state不相上下,能满足我们的高并发低延迟读写要求并且redis中有丰富的数据结构够我们使用
- 行为序列的分析需要设计不同的状态结构,业务规则的需求是无限的,我们无法用同一套代码来应付灵活多变的规则条件计算逻辑,因此要考虑从业务编码中解放出来,我们考虑使用外部系统为规则去选用一个 “状态机”,然后注入到我们的规则引擎,我们考虑使用动态脚本语言来实现,如Groovy
- 综上所述,开发人员往规则管理平台添加groory模板数据,开发flink流程消费用户行为数据,并通过cdc获取mysql中的规则数据,连接进行各类规则的状态机运算
四. 技术验证
Groovy动态调用
bitmap发布流程
- bitmap序列化 --> mysql元数据表 --> 反序列化的可行性
public class RulePublisher {
public static void main(String[] args) throws SQLException, IOException, ClassNotFoundException {
String ruleId = "rule_0001";
//根据规则,去es中查询人群
int[] ruleProfileUsers = {1,3,5};
//把查询出来的人群包,变成bitmap
RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf(ruleProfileUsers);
//把生成好的bitmap,序列到一个字节数组中
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
roaringBitmap.serialize(dataOutputStream);
byte[] bitmapBytes = byteArrayOutputStream.toByteArray();
//将这个bitmap连同本规则,一同发布到规则平台的元数据系统中
//注意在mysql中的byte类型
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/stream_db?serverTimezone=Asia/Shanghai", "root", "123456");
PreparedStatement prepareStatement = connection.prepareStatement("insert into rule_tf values (?,?)");
prepareStatement.setString(1,ruleId);
prepareStatement.setBytes(2,bitmapBytes);
prepareStatement.execute();
prepareStatement.close();
}
}
public class BitmapFromMysqlBytes {
public static void main(String[] args) throws SQLException, IOException {
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/stream_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
PreparedStatement prepareStatement = connection.prepareStatement("select rule,profile_bitmap from rule_tf where rule =?");
prepareStatement.setString(1,"rule_0001");
ResultSet resultSet = prepareStatement.executeQuery();
resultSet.next();
byte[] profileBitmap = resultSet.getBytes("profile_bitmap");
//反序列化bitmap
RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
//测试反序列化的数据是否为之前的数据
System.out.println(roaringBitmap.contains(1));
System.out.println(roaringBitmap.contains(3));
System.out.println(roaringBitmap.contains(201));
System.out.println(roaringBitmap.contains(202));
prepareStatement.close();
connection.close();
}
}
flink-cdc规则获取流程
- 一个flinkcdc的demo
public class FlinkCdcDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//踩坑:请将flinktable中的 <scope>provided</scope>注释掉
DebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("root")
.password("123456")
.databaseList("my_data")
.tableList("my_data.log_trail")
.startupOptions(StartupOptions.initial())
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//4.使用 CDC Source 从 MySQL 读取数据
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//5.打印数据
mysqlDS.print();
//6.执行任务
env.execute();
}
}
- 使用flinksql-cdc获取mysql中的bitmap
public class FlinkCdcBitmap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//2.x版本需要设置Checkpoint,否则只能读取全量数据,无法实时读取增量数据
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:/d/sun/");
StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
//创建cdc连接器
tableEnv.executeSql("CREATE TABLE rule_tf (" +
" rule STRING PRIMARY KEY NOT ENFORCED, " +
" profile_bitmap BINARY " +
") WITH (" +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root' ," +
" 'password' = '123456' ," +
" 'database-name' = 'my_data' ," +
" 'table-name' = 'rule_tf'" +
")");
// 踩坑,不支持使用CDC2.2版本,请使用CDC2.0,另外flink1.2不支持sql-cdc
Table table = tableEnv.sqlQuery("select rule,profile_bitmap from rule_tf");
tableEnv.toChangelogStream(table).print();
SingleOutputStreamOperator<String> process = tableEnv.toChangelogStream(table).process(new ProcessFunction<Row, String>() {
@Override
public void processElement(Row value, ProcessFunction<Row, String>.Context ctx, Collector<String> out) throws Exception {
RowKind valueKind = value.getKind();
if (valueKind == RowKind.INSERT) {
String rule_id = (String) value.getField("rule");
byte[] profileBitmap = (byte[]) value.getField("profile_bitmap");
//反序列化bitmap
RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
assert profileBitmap != null;
roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
System.out.println(Arrays.toString(roaringBitmap.stream().toArray()));
out.collect(Arrays.toString(roaringBitmap.stream().toArray()));
}
}
});
process.print();
env.execute();
}
}
- cdc规则数据整合事件流
public class FlinkInjectRuleBitmapProcessEvents {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:/d/sun/");
StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
//模拟一个事件流
SingleOutputStreamOperator<Tuple2<String, String>> event = env.socketTextStream("192.168.10.100", 5666).map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
return Tuple2.of(split[0], split[1]);
}
});
//创建cdc连接器
tableEnv.executeSql("CREATE TABLE rule_tf (" +
" rule STRING PRIMARY KEY NOT ENFORCED, " +
" profile_bitmap BINARY " +
") WITH (" +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root' ," +
" 'password' = '123456' ," +
" 'database-name' = 'my_data' ," +
" 'table-name' = 'rule_tf'" +
")");
Table table = tableEnv.sqlQuery("select rule,profile_bitmap from rule_tf");
DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);
SingleOutputStreamOperator<Tuple2<String, RoaringBitmap>> streamRule = changelogStream.map(new MapFunction<Row, Tuple2<String, RoaringBitmap>>() {
@Override
public Tuple2<String, RoaringBitmap> map(Row value) throws Exception {
String rule_id = (String) value.getField("rule");
byte[] profileBitmap = (byte[]) value.getField("profile_bitmap");
RoaringBitmap roaringBitmap = RoaringBitmap.bitmapOf();
roaringBitmap.deserialize(ByteBuffer.wrap(profileBitmap));
//标记获取规则的状态
if (RowKind.DELETE == value.getKind()) {
return Tuple2.of(rule_id+":"+"DELETE", roaringBitmap);
}
else {
//更新则覆盖
return Tuple2.of(rule_id+":"+"UPSERT", roaringBitmap);
}
}
});
//进行广播
MapStateDescriptor<String, RoaringBitmap> ruleStateDescriptor = new MapStateDescriptor<>("Rule", String.class, RoaringBitmap.class);
BroadcastStream<Tuple2<String, RoaringBitmap>> broadcast = streamRule.broadcast(ruleStateDescriptor);
//将广播流与行为事件进行连接
event.keyBy(tp -> tp.f0)
.connect(broadcast)
.process(new KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>() {
@Override
public void processElement(Tuple2<String, String> event, KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
ReadOnlyBroadcastState<String, RoaringBitmap> readOnlyContextBroadcastState = readOnlyContext.getBroadcastState(ruleStateDescriptor);
//对系统中的每个规则,进行一次人群判断
for (Map.Entry<String, RoaringBitmap> entry : readOnlyContextBroadcastState.immutableEntries()) {
String rule_id = entry.getKey();
RoaringBitmap profileBitmap = entry.getValue();
collector.collect(String.format("当前行为事件用户 %d 规则 %s的目标人群是否包含此人 %s",Integer.parseInt(event.f0),rule_id,profileBitmap.contains(Integer.parseInt(event.f0))));
}
}
@Override
public void processBroadcastElement(Tuple2<String, RoaringBitmap> ruleInfo, KeyedBroadcastProcessFunction<String, Tuple2<String, String>, Tuple2<String, RoaringBitmap>, String>.Context context, Collector<String> collector) throws Exception {
BroadcastState<String, RoaringBitmap> broadcastState = context.getBroadcastState(ruleStateDescriptor);
//形如 rule:DELETE的拼接
String[] split = ruleInfo.f0.split(":");
if ("DELETE".equals(split[1])) {
System.out.println("删除一条规则" +split[0] +" " + ruleInfo.f0 );
broadcastState.remove(split[0]);
}
else {
System.out.println("新增一条规则" +split[0] +" " + ruleInfo.f0);
broadcastState.put(split[0], ruleInfo.f1);
}
}
}).print();
env.execute();
}
}
五. 数据开发
通过Spark将用户画像数据导入到ES中
搜索人群画像形成bitmap
规则模板的数据结构
动态条件表达式的设计
规则(前端注入)可能存在多个,我们不能写死,同时规则之间的关系也是复杂的,我们考虑使用动态脚本语言模板来实现
状态机
状态机就是一段逻辑的封装,规则不能写死,同理逻辑也不能写死,我们同样考虑使用动态脚本语言模板来实现,在本篇中状态机即是动态条件表达式与Redis运算逻辑的结合体
触发型&非触发型规则运算
序列型规则运算
使用redis中的顶层key结构,value是某个人匹配到最大步骤数,新增一个结构来保留匹配到序列的次数
public void caclActionSeq(UserEvent userEvent){
int guid = userEvent.getGuid()
String redisSeqStepKey = ruleId + ":" + actionSeqConditionId + ":step";
String redisSeqCntKey = ruleId + ":" + actionSeqConditionId + ":cnt";
//从redis中获取该用户,本规则的行为序列,待完成序列的,已到达的,步骤数
String preStepStr = jedis.hget (redisSeqKey, guid + "")
int preStep = preStepStr == null? 0: Integer.parseInt (preStepStr)
//判断本次输入的事件,是否是行为序列参数期待的下一个条件
if (preStep<eventParams.size()){
JSONObject eventParam = eventParams.getJSONObject (preStep)
//如果输入事件,正是行为序列参数期待的下一个事件
if(UserEventComparator.userEventIsEqualParam(userEvent,eventParam)){
if(preStep== eventParams.size ()-1){
//将redis中的步骤号重置为0
jedis.hset(redisSeStepKey,guid+"," , "0")
// 将redis中该用户条件的完成次数+1
jedis.hincrBy(redisSeqCntKey,guid+** . 1)
}
else{
//否则步骤数+1
long by = jedis.hincrBy (redisSeqStepKey, guid + "" +value: 1)
}
}
}
}
定时型规则运算
新规则模型开发
六. 遗留问题
- redis中存用某个用户的逻辑数据,如果不做清除,下一次触发又是该用户的情形,会造成类似薅羊毛的事故,由于我们已经对Redis设置为按时间来清除过期数据,为了避免这种情况的发生,我们可以考虑增加一个最大投放次数的状态
- [触发型&非触发型规则运算],此处完全可以交给Groovy
- 如何处理新增的规则模型
- 某个规则圈选的人群,达到千万级别,经过测试,一个达到1亿级别的数据的bitmap,体积大约12M 放入mysql完全没问题
- 关于redis中的数据容量的问题?我们的一个规则的一个条件状态记录hash中,也就十万人级别(10万+元素),很轻松的,而且,如果redis负载能力不够的时候,可以部署redis集群的
- 如何兼容定时型规则?可以考虑新增一个是否是定时型规则的状态,但是存在的问题是使得项目变得复杂不好维护
- 性能测试的方法和描述,一般而言,一个subtask在1s内,能处理4000次行为次数条件的判断运算
网友评论