美文网首页
文章推荐系统 | 十、基于热门文章和新文章的在线召回

文章推荐系统 | 十、基于热门文章和新文章的在线召回

作者: 小王子特洛伊 | 来源:发表于2019-12-30 13:46 被阅读0次

    推荐阅读:
    文章推荐系统 | 一、推荐流程设计
    文章推荐系统 | 二、同步业务数据
    文章推荐系统 | 三、收集用户行为数据
    文章推荐系统 | 四、构建离线文章画像
    文章推荐系统 | 五、计算文章相似度
    文章推荐系统 | 六、构建离线用户画像
    文章推荐系统 | 七、构建离线文章特征和用户特征
    文章推荐系统 | 八、基于模型的离线召回
    文章推荐系统 | 九、基于内容的离线及在线召回

    在上篇文章中我们实现了基于内容的在线召回,接下来,我们将实现基于热门文章和新文章的在线召回。主要思路是根据点击次数,统计每个频道下的热门文章,根据发布时间统计每个频道下的新文章,当推荐文章不足时,可以根据这些文章进行补足。

    由于数据量较小,这里采用 Redis 存储热门文章和新文章的召回结果,数据结构如下所示

    热门文章召回 结构 示例
    popular_recall ch:{}:hot ch:18:hot
    新文章召回 结构 示例
    new_article ch:{}:new ch:18:new

    热门文章存储,键为 ch:频道ID:hot 值为 分数文章ID

    # ZINCRBY key increment member
    # ZSCORE
    # 为有序集 key 的成员 member 的 score 值加上增量 increment 。
    client.zincrby("ch:{}:hot".format(row['channelId']), 1, row['param']['articleId'])
    
    # ZREVRANGE key start stop [WITHSCORES]
    client.zrevrange(ch:{}:new, 0, -1)
    

    新文章存储,键为 ch:{频道ID}:new 值为 文章ID:时间戳

    # ZADD ZRANGE
    # ZADD key score member [[score member] [score member] ...]
    # ZRANGE page_rank 0 -1
    client.zadd("ch:{}:new".format(channel_id), {article_id: time.time()})
    

    热门文章在线召回

    首先,添加 Spark Streaming 和 Kafka 的配置,热门文章读取由业务系统发送到 Kafka 的 click-trace 主题中的用户实时行为数据

    KAFKA_SERVER = "192.168.19.137:9092"
    click_kafkaParams = {"metadata.broker.list": KAFKA_SERVER}
    HOT_DS = KafkaUtils.createDirectStream(stream_c, ['click-trace'], click_kafkaParams)
    

    接下来,利用 Spark Streaming 读取 Kafka 中的用户行为数据,筛选出被点击过的文章,将 Redis 中的文章热度分数进行累加即可

    client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST, port=DefaultConfig.REDIS_PORT, db=10)
    
    def update_hot_redis(self):
        """
        收集用户行为,更新热门文章分数
        :return:
        """
        def update_hot_article(rdd):
            for data in rdd.collect():
                # 过滤用户行为
                if data['param']['action'] in ['exposure', 'read']:
                    pass
                else:
                    client.zincrby("ch:{}:hot".format(data['channelId']), 1, data['param']['articleId'])
    
        HOT_DS.map(lambda x: json.loads(x[1])).foreachRDD(update_hot_article)
    

    测试,写入用户行为日志

    echo {\"actionTime\":\"2019-04-10 21:04:39\",\"readTime\":\"\",\"channelId\":18,\"param\":{\"action\": \"click\", \"userId\": \"2\", \"articleId\": \"14299\", \"algorithmCombine\": \"C2\"}} >> userClick.log
    

    查询热门文章

    127.0.0.1:6379[10]> keys *
    1) "ch:18:hot"
    127.0.0.1:6379[10]> ZRANGE "ch:18:hot" 0 -1
    1) "14299"
    

    新文章在线召回

    首先,添加 Spark Streaming 和 Kafka 的配置,新文章读取由业务系统发送到 Kafka 的 new-article 主题中的最新发布文章数据

    NEW_ARTICLE_DS = KafkaUtils.createDirectStream(stream_c, ['new-article'], click_kafkaParams)
    

    接下来,利用 Spark Streaming 读取 Kafka 的新文章,将其按频道添加到 Redis 中,Redis 的值为当前时间

    def  update_new_redis(self):
        """更新频道最新文章
        :return:
        """
        def add_new_article(rdd):
            for row in rdd.collect():
                channel_id, article_id = row.split(',')
                client.zadd("ch:{}:new".format(channel_id), {article_id: time.time()})
    
        NEW_ARTICLE_DS.map(lambda x: x[1]).foreachRDD(add_new_article)
    

    还需要在 Kafka 的启动脚本中添加 new-article 主题监听配置,这样就可以收到业务系统发送过来的新文章了,重新启动 Flume 和 Kafka

    /root/bigdata/kafka/bin/kafka-topics.sh --zookeeper 192.168.19.137:2181 --create --replication-factor 1 --topic new-article --partitions 1
    

    测试,向 Kafka 发送新文章数据

    from kafka import KafkaProducer 
    
    # kafka消息生产者
    kafka_producer = KafkaProducer(bootstrap_servers=['192.168.19.137:9092'])
    
    # 构造消息并发送
    msg = '{},{}'.format(18, 13891)
    kafka_producer.send('new-article', msg.encode())
    

    查看新文章

    127.0.0.1:6379[10]> keys *
    1) "ch:18:hot"
    2) "ch:18:new"
    127.0.0.1:6379[10]> ZRANGE "ch:18:new" 0 -1
    1) "13890"
    2) "13891"
    

    最后,修改 online_update.py,加入基于热门文章和新文章的在线召回逻辑,开启实时运行即可

    if __name__ == '__main__':
        ore = OnlineRecall()
        ore.update_content_recall()
        ore.update_hot_redis()
        ore.update_new_redis()
        stream_sc.start()
        # 使用 ctrl+c 可以退出服务
        _ONE_DAY_IN_SECONDS = 60 * 60 * 24
        try:
            while True:
                time.sleep(_ONE_DAY_IN_SECONDS)
        except KeyboardInterrupt:
            pass
    

    到这里,我们就完成了召回阶段的全部工作,包括基于模型和基于内容的离线召回,以及基于内容、热门文章和新文章的在线召回。通过召回,我们可以从数百万甚至上亿的原始物品数据中,筛选出和用户相关的几百、几千个可能感兴趣的物品,后面,我们将要进入到排序阶段,对召回的几百、几千个物品进行进一步的筛选和排序。

    参考

    https://www.bilibili.com/video/av68356229
    https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)

    相关文章

      网友评论

          本文标题:文章推荐系统 | 十、基于热门文章和新文章的在线召回

          本文链接:https://www.haomeiwen.com/subject/wsttictx.html