美文网首页工业推荐系统
文章推荐系统 | 十四、推荐中心

文章推荐系统 | 十四、推荐中心

作者: 小王子特洛伊 | 来源:发表于2020-01-14 19:00 被阅读0次

推荐阅读:
文章推荐系统 | 一、推荐流程设计
文章推荐系统 | 二、同步业务数据
文章推荐系统 | 三、收集用户行为数据
文章推荐系统 | 四、构建离线文章画像
文章推荐系统 | 五、计算文章相似度
文章推荐系统 | 六、构建离线用户画像
文章推荐系统 | 七、构建离线文章特征和用户特征
文章推荐系统 | 八、基于模型的离线召回
文章推荐系统 | 九、基于内容的离线及在线召回
文章推荐系统 | 十、基于热门文章和新文章的在线召回
文章推荐系统 | 十一、基于 LR 模型的离线排序
文章推荐系统 | 十二、基于 FTRL 模型的在线排序
文章推荐系统 | 十三、基于 Wide&Deep 模型的实时排序

在前面的文章中,我们实现了召回和排序,接下来将进入推荐逻辑处理阶段,通常称为推荐中心,推荐中心负责接收应用系统的推荐请求,读取召回和排序的结果并进行调整,最后返回给应用系统。推荐中心的调用流程如下所示:

推荐接口设计

通常推荐接口包括 Feed 流推荐和相似文章推荐

  • Feed 流推荐:根据用户偏好,获取推荐文章列表(这里的时间戳用于区分是刷新推荐列表还是查看历史推荐列表)
    参数:用户 ID,频道 ID,推荐文章数量,请求推荐的时间戳
    结果:曝光参数,每篇文章的行为埋点参数,上一条推荐的时间戳
  • 相似文章推荐:当用户浏览某文章时,获取该文章的相似文章列表
    参数:文章 ID,推荐文章数量
    结果:文章 ID 列表

行为埋点参数:

{
    "param": '{"action": "exposure", "userId": 1, "articleId": [1,2,3,4],  "algorithmCombine": "c1"}',
    "recommends": [
        {"article_id": 1, "param": {"click": "{"action": "click", "userId": "1", "articleId": 1, "algorithmCombine": 'c1'}", "collect": "...", "share": "...","read":"..."}},
        {"article_id": 2, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}},
        {"article_id": 3, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}},
        {"article_id": 4, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}}
    ]
    "timestamp": 1546391572
}

这里接口采用 gRPC 框架,在 user_reco.proto 文件中定义 Protobuf 序列化协议,其中定义了 Feed 流推荐接口:rpc user_recommend(User) returns (Track) {} 和相似文章接口:rpc article_recommend(Article) returns(Similar) {}

syntax = "proto3";

message User {
    string user_id = 1;
    int32 channel_id = 2;
    int32 article_num = 3;
    int64 time_stamp = 4;
}
// int32 ---> int64 article_id
message Article {
    int64 article_id = 1;
    int32 article_num = 2;

}

message param2 {
    string click = 1;
    string collect = 2;
    string share = 3;
    string read = 4;
}

message param1 {
    int64 article_id = 1;
    param2 params = 2;
}

message Track {
    string exposure = 1;
    repeated param1 recommends = 2;
    int64 time_stamp = 3;
}

message Similar {
    repeated int64 article_id = 1;
}

service UserRecommend {
    rpc user_recommend(User) returns (Track) {}
    rpc article_recommend(Article) returns(Similar) {}
}

接着,通过如下命令生成服务端文件 user_reco_pb2.py 和客户端文件 user_reco_pb2_grpc.py

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user_reco.proto

定义参数解析类,用于解析推荐请求的参数,包括用户 ID、频道 ID、文章数量、请求时间戳以及算法名称

class Temp(object):
    user_id = -10
    channel_id = -10
    article_num = -10
    time_stamp = -10
    algo = ""

定义封装埋点参数方法,其中参数 res 为推荐结果,参数 temp 为用户请求参数,将推荐结果封装为在 user_reco.proto 文件中定义的 Track 结构,其中携带了文章对埋点参数,包括了事件名称、算法名称以及时间等等,方便后面解析用户对文章对行为信息

def add_track(res, temp):
    """
    封装埋点参数
    :param res: 推荐文章id列表
    :param temp: rpc参数
    :return: 埋点参数
        文章列表参数
        单文章参数
    """
    # 添加埋点参数
    track = {}

    # 准备曝光参数
    # 全部字符串形式提供,在hive端不会解析问题
    _exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res),
                 "algorithmCombine": temp.algo}

    track['param'] = json.dumps(_exposure)
    track['recommends'] = []

    # 准备其它点击参数
    for _id in res:
        # 构造字典
        _dic = {}
        _dic['article_id'] = _id
        _dic['param'] = {}

        # 准备click参数
        _p = {"action": "click", "userId": temp.user_id, "articleId": str(_id),
              "algorithmCombine": temp.algo}

        _dic['param']['click'] = json.dumps(_p)
        # 准备collect参数
        _p["action"] = 'collect'
        _dic['param']['collect'] = json.dumps(_p)
        # 准备share参数
        _p["action"] = 'share'
        _dic['param']['share'] = json.dumps(_p)
        # 准备detentionTime参数
        _p["action"] = 'read'
        _dic['param']['read'] = json.dumps(_p)

        track['recommends'].append(_dic)

    track['timestamp'] = temp.time_stamp
    return track

AB Test 流量切分

由于推荐算法和策略是需要不断改进和完善等,所以 ABTest 也是推荐系统不可或缺的功能。可以根据用户 ID 将流量切分为多个桶(Bucket),每个桶对应一种排序策略,桶内流量将使用相应的策略进行排序,使用 ID 进行流量切分能够保证用户体验的一致性。通常 ABTest 过程如下所示:

通过定义 AB Test 参数,可以实现为不同的用户使用不同的推荐算法策略,其中 COMBINE 为融合方式,RECALL 为召回方式,SORT 为排序方式,CHANNEL 为频道数量,BYPASS 为分桶设置,sort_dict 为不同的排序服务对象。可以看到 Algo-1 使用 LR 进行排序,而 Algo-2 使用 Wide&Deep 进行排序

from collections import namedtuple

# ABTest参数信息
param = namedtuple('RecommendAlgorithm', ['COMBINE',
                                          'RECALL',
                                          'SORT',
                                          'CHANNEL',
                                          'BYPASS']
                   )

RAParam = param(
    COMBINE={
        'Algo-1': (1, [100, 101, 102, 103, 104], [200]),  # 首页推荐,所有召回结果读取+LR排序
        'Algo-2': (2, [100, 101, 102, 103, 104], [201])  # 首页推荐,所有召回结果读取 排序
    },
    RECALL={
        100: ('cb_recall', 'als'),  # 离线模型ALS召回,recall:user:1115629498121 column=als:18
        101: ('cb_recall', 'content'),  # 离线word2vec的画像内容召回 'recall:user:5', 'content:1'
        102: ('cb_recall', 'online'),  # 在线word2vec的画像召回 'recall:user:1', 'online:1'
        103: 'new_article',  # 新文章召回 redis当中    ch:18:new
        104: 'popular_article',  # 基于用户协同召回结果 ch:18:hot
        105: ('article_similar', 'similar')  # 文章相似推荐结果 '1' 'similar:2'
    },
    SORT={
        200: 'LR',
        201: 'WDL'
    },
    CHANNEL=25,
    BYPASS=[
            {
                "Bucket": ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd'],
                "Strategy": "Algo-1"
            },
            {
                "BeginBucket": ['e', 'f'],
                "Strategy": "Algo-2"
            }
        ]
)

sort_dict = {
    "LR": lr_sort_service,
    "WDL": wdl_sort_service
}

流量切分,将用户 ID 进行哈希,然后取哈希结果的第一个字符,将包含该字符的策略桶所对应的算法编号赋值到此用户请求参数的 algo 属性中,后面将调用该编号对应的算法策略为此用户计算推荐数据

import hashlib
from setting.default import DefaultConfig, RAParam

# 进行分桶实现分流,制定不同的实验策略
bucket = hashlib.md5(user_id.encode()).hexdigest()[:1]
if bucket in RAParam.BYPASS[0]['Bucket']:
    temp.algo = RAParam.BYPASS[0]['Strategy']
else:
    temp.algo = RAParam.BYPASS[1]['Strategy']

推荐中心逻辑

推荐中心逻辑主要包括:

  • 接收应用系统发送的推荐请求,解析请求参数
  • 进行 ABTest 分流,为用户分配推荐策略
  • 根据分配的算法调用召回服务和排序服务,读取推荐结果
  • 根据业务进行调整,如过滤、补足、合并信息等
  • 封装埋点参数,返回推荐结果

首先,在 Hbase 中创建历史推荐结果表 history_recommend,用于存储用户历史推荐结果

create 'history_recommend', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999}   86400
# 每次指定一个时间戳,可以达到不同版本的效果
put 'history_recommend', 'reco:his:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]

继续在 Hbase 中创建待推荐结果表 wait_recommend,用于存储经过多路召回并且排序之后的待推荐结果,当 wait_recommend 没有数据时,才再次调用排序服务计算出新的待推荐结果并写入到 wait_recommend,所以不需设置多个版本。注意该表与 cb_recall 的区别,cb_recall 存储的是还未经排序的召回结果。

create 'wait_recommend', 'channel'

put 'wait_recommend', 'reco:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]
put 'wait_recommend', 'reco:1', 'channel:0', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846]

用户获取 Feed 流推荐数据时,如果用户向下滑动,发出的是刷新推荐列表的请求,需要传入当前时间作为请求时间戳参数,该请求时间戳必然大于 Hbase 历史推荐结果表中的请求时间戳,那么程序将获取新的推荐列表,并返回 Hbase 历史推荐结果表中最近一次推荐的请求时间戳,用于查询历史推荐结果;如果用户向上滑动,发出的是查看历史推荐结果的请求,需要传入前面刷新推荐列表时返回的最近一次推荐的请求时间戳,该请求时间戳必然小于等于 Hbase 历史推荐结果中最近一次推荐的时间戳,那么程序将获取小于等于该请求时间戳的最近一次历史推荐结果,并返回小于该推荐结果最近一次推荐的时间戳,也就是上一次推荐的时间戳,下面是具体实现。

在获取推荐列表时,首先获取用户的历史数据库中最近一次时间戳 last_stamp,没有则将 last_stamp 置为 0

try:
    last_stamp = self.hbu.get_table_row('history_recommend',
                                        'reco:his:{}'.format(temp.user_id).encode(),
                                        'channel:{}'.format(temp.channel_id).encode(),
                                        include_timestamp=True)[1]
except Exception as e:
    last_stamp = 0
  • 如果用户请求的时间戳小于历史推荐结果中最近一次请求的时间戳 last_stamp,那么该请求为用户获取历史推荐结果
    1.如果没有历史推荐结果,则返回时间戳 0 以及空列表 []
    2.如果历史推荐结果只有一条,则返回这一条历史推荐结果并返回时间戳 0,表示已经没有历史推荐结果(APP 可以显示已经没有历史推荐记录了)
    3.如果历史推荐结果有多条,则返回历史推荐结果中第一条推荐结果(最近一次),然后返回历史推荐结果中第二条推荐结果的时间戳
if temp.time_stamp < last_stamp:
    try:
        row = self.hbu.get_table_cells('history_recommend',
                                       'reco:his:{}'.format(temp.user_id).encode(),
                                       'channel:{}'.format(temp.channel_id).encode(),
                                       timestamp=temp.time_stamp + 1,
                                       include_timestamp=True)
    except Exception as e:
        row = []
        res = []

    if not row:
        temp.time_stamp = 0
        res = []
    elif len(row) == 1 and row[0][1] == temp.time_stamp:
        res = eval(row[0][0])
        temp.time_stamp = 0
    elif len(row) >= 2:
        res = eval(row[0][0])
        temp.time_stamp = int(row[1][1])

    res = list(map(int, res))
    # 封装推荐结果
    track = add_track(res, temp)
    # 曝光参数设置为空
    track['param'] = ''

(注意:这里将用户请求的时间戳 +1,因为 Hbase 只能获取小于该时间戳的历史推荐结果)

  • 如果用户请求的时间戳大于 Hbase 历史推荐结果中最近一次请求的时间戳 last_stamp,那么该请求为用户刷新推荐列表,需要读取推荐结果并返回。如果结果为空,需要调用 user_reco_list() 方法,再次计算推荐结果,再返回。
if temp.time_stamp > last_stamp:
    # 获取缓存
    res = redis_cache.get_reco_from_cache(temp, self.hbu)
    # 如果结果为空,需要再次计算推荐结果 进行召回+排序,同时写入到hbase待推荐结果列表
    if not res:
        res = self.user_reco_list(temp)

    temp.time_stamp = int(last_stamp)
    track = add_track(res, temp)

定义 user_reco_list() 方法,首先要读取多路召回结果,根据为用户分配的算法策略,读取相应路径的召回结果,并进行重后合并

reco_set = []
# (1, [100, 101, 102, 103, 104], [200])
for number in RAParam.COMBINE[temp.algo][1]:
    if number == 103:
        _res = self.recall_service.read_redis_new_article(temp.channel_id)
        reco_set = list(set(reco_set).union(set(_res)))
    elif number == 104:
        _res = self.recall_service.read_redis_hot_article(temp.channel_id)
        reco_set = list(set(reco_set).union(set(_res)))
    else:
        # 100, 101, 102召回结果读取
        _res = self.recall_service.read_hbase_recall(RAParam.RECALL[number][0],
                                                     'recall:user:{}'.format(temp.user_id).encode(),
                                                     '{}:{}'.format(RAParam.RECALL[number][1],
                                                                    temp.channel_id).encode())
        reco_set = list(set(reco_set).union(set(_res)))

接着,过滤当前该请求频道的历史推荐结果,如果不是 0 频道还需过滤 0 频道的历史推荐结果

history_list = []
data = self.hbu.get_table_cells('history_recommend',
                                'reco:his:{}'.format(temp.user_id).encode(),
                                'channel:{}'.format(temp.channel_id).encode())

for _ in data:
    history_list = list(set(history_list).union(set(eval(_))))

data = self.hbu.get_table_cells('history_recommend',
                                'reco:his:{}'.format(temp.user_id).encode(),
                                'channel:{}'.format(0).encode())

for _ in data:
    history_list = list(set(history_list).union(set(eval(_))))

reco_set = list(set(reco_set).difference(set(history_list)))

最后,根据分配的算法策略,调用排序服务,将分数最高的 N 个推荐结果返回,并写入历史推荐结果表,如果还有剩余的排序结果,将其余写入待推荐结果表

# 使用指定模型对召回结果进行排序
# temp.user_id, reco_set
_sort_num = RAParam.COMBINE[temp.algo][2][0]
# 'LR'
reco_set = sort_dict[RAParam.SORT[_sort_num]](reco_set, temp, self.hbu)

if not reco_set:
    return reco_set
else:

    # 如果reco_set小于用户需要推荐的文章
    if len(reco_set) <= temp.article_num:
        res = reco_set
    else:
        # 大于要推荐的文章结果
        res = reco_set[:temp.article_num]

        # 将剩下的文章列表写入待推荐的结果
        self.hbu.get_table_put('wait_recommend',
                               'reco:{}'.format(temp.user_id).encode(),
                               'channel:{}'.format(temp.channel_id).encode(),
                               str(reco_set[temp.article_num:]).encode(),
                               timestamp=temp.time_stamp)

    # 直接写入历史记录当中,表示这次又成功推荐一次
    self.hbu.get_table_put('history_recommend',
                           'reco:his:{}'.format(temp.user_id).encode(),
                           'channel:{}'.format(temp.channel_id).encode(),
                           str(res).encode(),
                           timestamp=temp.time_stamp)

    return res

到这里,推荐中心的基本逻辑已经结束。下面是读取多路召回结果的实现细节:通过指定列族,读取基于模型、离线内容以及在线的召回结果,并删除 cb_recall 的召回结果

def read_hbase_recall_data(self, table_name, key_format, column_format):
    """
    读取cb_recall当中的推荐数据
    读取的时候可以选择列族进行读取als, online, content

    :return:
    """
    recall_list = []
    data = self.hbu.get_table_cells(table_name, key_format, column_format)
    # data是多个版本的推荐结果[[],[],[],]
    for _ in data:
        recall_list = list(set(recall_list).union(set(eval(_))))
    self.hbu.get_table_delete(table_name, key_format, column_format)
    return recall_list

读取 redis 中的新文章

def read_redis_new_article(self, channel_id):
    """
    读取新文章召回结果
    :param channel_id: 提供频道
    :return:
    """
    _key = "ch:{}:new".format(channel_id)
    try:
        res = self.client.zrevrange(_key, 0, -1)
    except Exception as e:
        res = []

    return list(map(int, res))

读取 redis 中的热门文章,并选取热度最高的前 K 个文章

def read_redis_hot_article(self, channel_id):
    """
    读取热门文章召回结果
    :param channel_id: 提供频道
    :return:
    """
    _key = "ch:{}:hot".format(channel_id)
    try:
        res = self.client.zrevrange(_key, 0, -1)
    except Exception as e:

    # 由于每个频道的热门文章有很多,因为 保留文章点击次数
    res = list(map(int, res))
    if len(res) > self.hot_num:
        res = res[:self.hot_num]
    return res

读取相似文章

def read_hbase_article_similar(self, table_name, key_format, article_num):
    """获取文章相似结果
    :param article_id: 文章id
    :param article_num: 文章数量
    :return:
    """
    try:
        _dic = self.hbu.get_table_row(table_name, key_format)

        res = []
        _srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True)
        if len(_srt) > article_num:
            _srt = _srt[:article_num]
        for _ in _srt:
            res.append(int(_[0].decode().split(':')[1]))
    except Exception as e:
        res = []
    return res

使用缓存策略

  • 如果 redis 缓存中存在数据,就直接从 redis 缓存中获取推荐结果
  • 如果 redis 缓存为空而 Hbase 的待推荐结果表 wait_recommend 不为空,则从 wait_recommend 中获取推荐结果,并将一定数量的待推荐结果放入 redis 缓存中
  • 若 redis 和 wait_recommend 都为空,则需读取召回结果并进行排序,将排序结果写入 Hbase 的待推荐结果表 wait_recommend 中及 redis 中

(每次读取的推荐结果都要将其写入 Hbase 的历史推荐结果表 history_recommend 中)

读取 redis 缓存

#读取redis对应的键
key = 'reco:{}:{}:art'.format(temp.user_id, temp.channel_id)
# 读取,删除,返回结果
pl = cache_client.pipeline()

# 读取redis数据
res = cache_client.zrevrange(key, 0, temp.article_num - 1)
if res:
    # 手动删除读取出来的缓存结果
    pl.zrem(key, *res)

如果 redis 缓存为空

else:
    # 删除键
    cache_client.delete(key)
    try:
        # 从wait_recommend中读取
        wait_cache = eval(hbu.get_table_row('wait_recommend',
                                            'reco:{}'.format(temp.user_id).encode(),
                                            'channel:{}'.format(temp.channel_id).encode()))
    except Exception as e:
        wait_cache = []

    # 如果为空则直接返回空
    if not wait_cache:
        return wait_cache

    # 如果wait_recommend中有数据
    if len(wait_cache) > 100:
        cache_redis = wait_cache[:100]

        # 前100个数据放入redis
        pl.zadd(key, dict(zip(cache_redis, range(len(cache_redis)))))

        # 100个后面的数据,在放回wait_recommend
        hbu.get_table_put('wait_recommend',
                          'reco:{}'.format(temp.user_id).encode(),
                          'channel:{}'.format(temp.channel_id).encode(),
                          str(wait_cache[100:]).encode())

    else:
        # 清空wait_recommend数据
        hbu.get_table_put('wait_recommend',
                          'reco:{}'.format(temp.user_id).encode(),
                          'channel:{}'.format(temp.channel_id).encode(),
                          str([]).encode())

        # 所有不足100个数据,放入redis
        pl.zadd(key, dict(zip(wait_cache, range(len(wait_cache)))))

    res = cache_client.zrange(key, 0, temp.article_num - 1)

最后,在 Supervisor 中配置 gRPC 实时推荐程序

[program:online]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/abtest/routing.py
directory=/root/toutiao_project/reco_sys/abtest
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/recommendsuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true

The End

文章推荐系统系列到此就完结啦~ 撒花 🎉🎉🎉
若有疏漏的地方,欢迎各位多多指正,感谢关注,love & peace. 🙏🙏🙏

参考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)

相关文章

网友评论

    本文标题:文章推荐系统 | 十四、推荐中心

    本文链接:https://www.haomeiwen.com/subject/twgcictx.html