您当前的位置:首页 > 电脑百科 > 站长技术 > 网站

新闻个性化推荐系统源码之构建离线用户画像

时间:2020-09-02 11:53:15  来源:  作者:

用户画像往往是大型网站的重要模块,基于用户画像不仅可以实现个性化推荐,还可以实现用户分群、精准推送、精准营销以及用户行为预测、商业化转化分析等,为商业决策提供数据支持。通常用户画像包括用户属性信息(性别、年龄、出生日期等)、用户行为信息(浏览、收藏、点赞等)以及环境信息(时间、地理位置等)。

处理用户行为数据

在数据准备阶段,我们通过 Flume 已经可以将用户行为数据收集到 Hive 的 user_action 表的 HDFS 路径中,先来看一下这些数据长什么样子,我们读取当天的用户行为数据,注意读取之前要先关联分区

_day = time.strftime("%Y-%m-%d", time.localtime())
_localions = '/user/hive/warehouse/profile.db/user_action/' + _dayif fs.exists(_localions):    # 如果有该文件直接关联,捕获关联重复异常    try:
        self.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (_day, _localions))
    except Exception as e:
        pass    self.spark.sql("use profile")
    user_action = self.spark.sql("select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>=" + _day)

user_action 结果如下所示

新闻个性化推荐系统源码之构建离线用户画像

 

useraction

可以发现,上面的一条记录代表用户对文章的一次行为,但通常我们需要查询某个用户对某篇文章的所有行为,所以,我们要将这里用户对文章的多条行为数据合并为一条,其中包括用户对文章的所有行为。我们需要新建一个 Hive 表 user_article_basic,这张表包括了用户 ID、文章 ID、是否曝光、是否点击、阅读时间等等,随后我们将处理好的用户行为数据存储到此表中

create table user_article_basic
(    user_id     BIGINT comment "userID",
    action_time STRING comment "user actions time",
    article_id  BIGINT comment "articleid",
    channel_id  INT comment "channel_id",
    shared      BOOLEAN comment "is shared",
    clicked     BOOLEAN comment "is clicked",
    collected   BOOLEAN comment "is collected",
    exposure    BOOLEAN comment "is exposured",
    read_time   STRING comment "reading time"
)    COMMENT "user_article_basic"
    CLUSTERED by (user_id) into 2 buckets
    STORED as textfile
    LOCATION '/user/hive/warehouse/profile.db/user_article_basic';

遍历每一条原始用户行为数据,判断用户对文章的行为,在 user_action_basic 中将该用户与该文章对应的行为设置为 True

if user_action.collect():
    def _generate(row):        _list = []        if row.action == 'exposure':
            for article_id in eval(row.articleId):
                # ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]
                _list.Append(                    [row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])            return _list
        else:
            class Temp(object):
                shared = False                clicked = False                collected = False                read_time = ""
            _tp = Temp()            if row.action == 'click':
                _tp.clicked = True            elif row.action == 'share':
                _tp.shared = True            elif row.action == 'collect':
                _tp.collected = True            elif row.action == 'read':
                _tp.clicked = True            _list.append(                [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,                 True, row.readTime])            return _list
    user_action_basic = user_action.rdd.flatMap(_generate)    user_action_basic = user_action_basic.toDF(        ["user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure",
         "read_time"])

user_action_basic 结果如下所示,这里的一条记录包括了某个用户对某篇文章的所有行为

新闻个性化推荐系统源码之构建离线用户画像

 

user_action_basic

由于 Hive 目前还不支持 pyspark 的原子性操作,所以 user_article_basic 表的用户行为数据只能全量更新(实际场景中可以选择其他语言或数据库实现)。这里,我们需要将当天的用户行为与 user_action_basic 的历史用户行为进行合并

old_data = uup.spark.sql("select * from user_article_basic")
new_data = old_data.unionAll(user_action_basic)

合并后又会产生一个新的问题,那就是用户 ID 和文章 ID 可能重复,因为今天某个用户对某篇文章的记录可能在历史数据中也存在,而 unionAll() 方法并没有去重,这里我们可以按照用户 ID 和文章 ID 进行分组,利用 max() 方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可,去重后直接存储到 user_article_basic 表中

new_data.registerTempTable("temptable")
self.spark.sql('''insert overwrite table user_article_basic select user_id, max(action_time) as action_time,
        article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked,
        max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable
        group by user_id, article_id''')

表 user_article_basic 结果如下所示

新闻个性化推荐系统源码之构建离线用户画像

 

user_article_basic

计算用户画像

我们选择将用户画像存储在 Hbase 中,因为 Hbase 支持原子性操作和快速读取,并且 Hive 也可以通过创建外部表关联到 Hbase,进行离线分析,如果要删除 Hive 外部表的话,对 Hbase 也没有影响。首先,在 Hbase 中创建用户画像表

create 'user_profile', 'basic','partial','env'

在 Hive 中创建 Hbase 外部表,注意字段类型设置为 map

create external table user_profile_hbase
(    user_id         STRING comment "userID",
    information     MAP<STRING, DOUBLE> comment "user basic information",
    article_partial MAP<STRING, DOUBLE> comment "article partial",
    env             MAP<STRING, INT> comment "user env"
)    COMMENT "user profile table"
    STORED BY 'org.Apache.hadoop.hive.hbase.HBaseStorageHandler'
        WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")
    TBLPROPERTIES ("hbase.table.name" = "user_profile");

创建外部表之后,还需要导入一些依赖包

cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/
cp -r /root/bigdata/hive/lib/h*.jar /root/bigdata/spark/jars/

接下来,读取处理好的用户行为数据,由于日志中的 channel_id 有可能是来自于推荐频道(0),而不是文章真实的频道,所以这里要将 channel_id 列删除

spark.sql("use profile")
user_article_basic = spark.sql("select * from user_article_basic").drop('channel_id')

通过文章 ID,将用户行为数据与文章画像数据进行连接,从而得到文章频道 ID 和文章主题词

spark.sql('use article')
article_topic = spark.sql("select article_id, channel_id, topics from article_profile")
user_article_topic = user_article_basic.join(article_topic, how='left', on=['article_id'])

user_article_topic 结果如下图所示,其中 topics 列即为文章主题词列表,如 ['补码', '字符串', '李白', ...]

新闻个性化推荐系统源码之构建离线用户画像

 

user_article_topic

接下来,我们需要计算每一个主题词对于用户的权重,所以需要将 topics 列中的每个主题词都拆分为单独的一条记录。可以利用 Spark 的 explode() 方法,达到类似“爆炸”的效果

import pyspark.sql.functions as F
user_article_topic = user_topic.withColumn('topic', F.explode('topics')).drop('topics')

user_article_topic 如下图所示

新闻个性化推荐系统源码之构建离线用户画像

 

user_article_topic

我们通过用户对哪些文章发生了行为以及该文章有哪些主题词,计算出了用户对哪些主题词发生了行为。这样,我们就可以根据用户对主题词的行为来计算主题词对用户的权重,并且将这些主题词作为用户的标签。那么,用户标签权重的计算公式为:用户标签权重 =(用户行为分值之和)x 时间衰减。其中,时间衰减公式为:时间衰减系数 = 1 / (log(t) + 1),其中 t 为发生行为的时间距离当前时间的大小

不同的用户行为对应不同的权重,如下所示

用户行为分值阅读时间(<1000)1阅读时间(>=1000)2收藏2分享3点击5

计算用户标签及权重,并存储到 Hbase 中 user_profile 表的 partial 列族中。注意,这里我们将频道 ID 和标签一起作为 partial 列族的键存储,这样我们就方便查询不同频道的标签及权重了

def compute_user_label_weights(partitions):
    """ 计算用户标签权重
    """
    action_weight = {        "read_min": 1,
        "read_middle": 2,
        "collect": 2,
        "share": 3,
        "click": 5
    }    from datetime import datetime    import numpy as np        # 循环处理每个用户对应的每个主题词    for row in partitions:
        # 计算时间衰减系数        t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
        alpha = 1 / (np.log(t.days + 1) + 1)
                if row.read_time  == '':
            read_t = 0
        else:
            read_t = int(row.read_time)                # 计算阅读时间的行为分数        read_score = action_weight['read_middle'] if read_t > 1000 else action_weight['read_min']
                # 计算各种行为的权重和并乘以时间衰减系数        weights = alpha * (row.shared * action_weight['share'] + row.clicked * action_weight['click'] +
                          row.collected * action_weight['collect'] + read_score)
                # 更新到user_profilehbase表        with pool.connection() as conn:            table = conn.table('user_profile')
            table.put('user:{}'.format(row.user_id).encode(),
                      {'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
                          weights).encode()})            conn.close()
user_topic.foreachPartition(compute_user_label_weights)

在 Hive 中查询用户标签及权重

hive> select * from user_profile_hbase limit 1;
OKuser:1  {"birthday":0.0,"gender":null}  {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++":0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code":0.23934588700996243,"18:DIR......

接下来,要将用户属性信息加入到用户画像中。读取用户基础信息,存储到用户画像表的 basic 列族即可

def update_user_info():
    """
    更新用户画像的属性信息
    :return:
    """
    spark.sql("use toutiao")
    user_basic = spark.sql("select user_id, gender, birthday from user_profile")
    def udapte_user_basic(partition):        import happybase        #  用于读取hbase缓存结果配置        pool = happybase.ConnectionPool(size=10, host='172.17.0.134', port=9090)
        for row in partition:
            from datetime import date
            age = 0
            if row.birthday != 'null':
                born = datetime.strptime(row.birthday, '%Y-%m-%d')
                today = date.today()
                age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))            with pool.connection() as conn:                table = conn.table('user_profile')
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:gender'.encode(): json.dumps(row.gender).encode()})
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:birthday'.encode(): json.dumps(age).encode()})
                conn.close()
    user_basic.foreachPartition(udapte_user_basic)

到这里,我们的用户画像就计算完成了。

Apscheduler 定时更新

定义更新用户画像方法,首先处理用户行为日志,拆分文章主题词,接着计算用户标签的权重,最后再将用户属性信息加入到用户画像中

def update_user_profile():
    """
    定时更新用户画像的逻辑
    :return:
    """
    up = UpdateUserProfile()    if up.update_user_action_basic():
        up.update_user_label()        up.update_user_info()

在 Apscheduler 中添加定时更新用户画像任务,设定每隔 2 个小时更新一次

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
# 创建scheduler,多进程执行executors = {    'default': ProcessPoolExecutor(3)
}scheduler = BlockingScheduler(executors=executors)# 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
scheduler.start()

另外说一下,在实际场景中,用户画像往往是非常复杂的,下面是电商场景的用户画像,可以了解一下。

新闻个性化推荐系统源码之构建离线用户画像

 

用户画像



Tags:用户画像   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
导读:用户标签是个性化推荐、计算广告、金融征信等众多大数据业务应用的基础,它是原始的用户行为数据和大数据应用之间的桥梁,本文会介绍用户标签的构建方法,也就是用户画像技术...【详细内容】
2021-07-02  Tags: 用户画像  点击:(125)  评论:(0)  加入收藏
今天谈下对大数据下的用户画像和标签体系构建的一些关键点思考,对于用户画像和标签体系构建实际上网上已经有很多相关的历史文章可以参考,今天文章这篇文章不会系统地去谈整...【详细内容】
2021-03-08  Tags: 用户画像  点击:(172)  评论:(0)  加入收藏
网易大数据生态数量级巨大,且产品线丰富,覆盖用户娱乐、电商、教育等领域,并且APP活跃度高,积累了多维度的用户行为数据。通过集团数据资产构建全域用户画像,旨在服务于域内众多...【详细内容】
2020-09-09  Tags: 用户画像  点击:(72)  评论:(0)  加入收藏
用户画像往往是大型网站的重要模块,基于用户画像不仅可以实现个性化推荐,还可以实现用户分群、精准推送、精准营销以及用户行为预测、商业化转化分析等,为商业决策提供数据支持...【详细内容】
2020-09-02  Tags: 用户画像  点击:(121)  评论:(0)  加入收藏
随着时代和技术的发展,广告的形式呈现出多样化、立体化的趋势,从最早的纸媒广告发展到如今的网页的展示、搜索广告,广告的定位也从原先的“广而告之”发展成大数据时代的“精准...【详细内容】
2020-09-01  Tags: 用户画像  点击:(154)  评论:(0)  加入收藏
2020年,对K12在线教育行业来说是机遇与挑战并存的一年。在疫情影响、政策鼓励、科技推动等多重因素下,K12在线教育的渗透率急剧提升,在2020年3月行业渗透率达到顶峰,“停课不停...【详细内容】
2020-07-01  Tags: 用户画像  点击:(115)  评论:(0)  加入收藏
颜国平,腾讯云-天御系统研发负责人。 一直负责腾讯自有验证码、业务安全、防刷、账号安全等研发工作。 内部支持的产品(游戏、电商、腾讯投资的O2O企业)非常广泛。 在业务安全领...【详细内容】
2020-04-17  Tags: 用户画像  点击:(70)  评论:(0)  加入收藏
随着大数据应用的讨论、创新,个性化技术成为了一个重要落地点。相比传统的线下会员管理、问卷调查,大数据第一次使得企业能够通过移动互联网便利地获取用户更为广泛的反馈信息...【详细内容】
2020-03-07  Tags: 用户画像  点击:(91)  评论:(0)  加入收藏
▌简易百科推荐
今天我们来聊一下北京地区的《ICP经营许可证》有多好办,现在的互联网上提供的商机越来越多,增值电信业务十分火爆,企业通过互联网突破地域的限制,把公司产品卖到更远的地方,同时...【详细内容】
2021-12-17  梦想理应飞翔Yy    Tags:《ICP经营许可证》   点击:(12)  评论:(0)  加入收藏
转自: https://blog.kermsite.com/p/blog-intro/由于格式问题,部分链接、表格可能会失效,若失效请访问原文此专题将详细介绍如何从零开始搭建一个个人博客。Dec 01, 2021阅读时...【详细内容】
2021-12-17  LaLiLi    Tags:个人博客   点击:(6)  评论:(0)  加入收藏
SP证是第二类增值电信业务经营许可证的简称。分为全网SP证和地网SP证。申请经营许可证是在工信部申请,全网SP经营许可证的有效期是5年,全网SP许可证在工信部办理全网SP续期,地...【详细内容】
2021-11-01  s陳述    Tags:sp证书   点击:(38)  评论:(0)  加入收藏
现在还有许多人不知道EDI许可证是什么东西今天我就来给大家讲解一下.EDI许可证就是一种增值电信业务经营许可证。是针对在线数据处理和交易处理业务需求的专业资格证书。 《...【详细内容】
2021-10-28  soberXx    Tags:edi许可证   点击:(75)  评论:(0)  加入收藏
元素的化学概念,如周期表中的化学元素,一切物质都是由元素构成的。对程序员而言,网站建设制作就是代码构成网站。企业网站设计者也收集了各种各样的元素,但并非所有元素都需要运...【详细内容】
2021-10-26  南宁云尚网络    Tags:企业网站   点击:(39)  评论:(0)  加入收藏
在运营网站的过程中,有一件不可忽略的事情。那就是网站上线之前需要完成 ICP 备案。说到这里,很多朋友就提出疑问了~&middot; 什么是 ICP 备案呢?&middot; ICP 备案需要哪些材...【详细内容】
2021-10-22  启测云    Tags:ICP备案   点击:(45)  评论:(0)  加入收藏
最近有朋友问我,我公司有外资就不能申请ICP许可证了么?外资的定义是什么?其实是可以的,但有一个特定条件必须满足,外资公司是指公司有外资股东,比如香港、加拿大、美国、韩国等等,...【详细内容】
2021-10-21  小白速看Z    Tags:ICP   点击:(51)  评论:(0)  加入收藏
自互联网出现以来,超文本传输协议http协议被广泛用于在Web浏览器和网站服务器之间传递信息,但随着互联网的发展,另一种协议&mdash;&mdash;https出现,并与http一同服务于这个互联...【详细内容】
2021-10-20  我是FEIYA    Tags:https   点击:(44)  评论:(0)  加入收藏
Grafana Loki 是一个日志聚合工具,它是功能齐全的日志堆栈的核心。图片来自 包图网先看看结果有多轻量吧: Loki 是一个为有效保存日志数据而优化的数据存储。日志数据的高效索...【详细内容】
2021-09-14    51CTO  Tags:Loki日志   点击:(97)  评论:(0)  加入收藏
背景最近做微信小程序开发比较多,大家知道线上微信小程序为了安全起见,要求后端通信协议必须是HTTPS,这就要求需要安装证书。为了测试预发布线上环境,特地买了个最便宜的域名,为...【详细内容】
2021-09-14  小李子说程序    Tags:HTTPS证书   点击:(124)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条