您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Python

基于PySpark SQL的媒体浏览日志ETL作业

时间:2023-11-27 14:43:17  来源:微信公众号  作者:口袋大数据

pyspark除了官方的文档,网上的教程资料一直很少,但基于调度平台下,使用pyspark编写代码非常高效,程序本身是提交到spark集群中,性能上也是毫无问题的,在本文中,我们将深入探讨基于Spark的媒体浏览日志ETL(提取、转换、加载)流水线的详细实现,在展示如何使用PySpark SQL处理大规模的媒体浏览日志数据,包括IP地址转换、数据清洗、时间维度补充、码表关联等关键步骤。

基于PySpark SQL的媒体浏览日志ETL作业

一、环境配置

首先,我们需要创建一个SparkSession并导入必要的库和设置默认参数,包括与IP-to-Location数据库的交互以及其他相关的配置。

如果pyspark仅仅是本地运行而不是提交集群时,可以使用findspark库,它能够帮助我们快速初始化Spark环境。在开始之前,确保您已经成功安装了findspark库,并已经下载并解压了Spark二进制文件。将Spark的安装路径和Python/ target=_blank class=infotextkey>Python解释器路径指定为变量。

import findspark

# 指定Spark的安装路径
spark_home = "/usr/local/spark"

# 指定用于Spark的Python解释器路径
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"

# 使用findspark.init方法初始化Spark环境
findspark.init(spark_home, python_path)

findspark.init方法将帮助设置PYSPARK_PYTHON和SPARK_HOME环境变量,确保正确的Spark库和配置文件被加载。其简化了Spark环境的初始化过程,避免手动配置环境变量。

二、数据处理

接下来,我们定义了一个NewsEtl类,用于执行数据处理和转换的各个步骤。这包括从HDFS中读取媒体浏览日志数据,进行IP地址转、换,清洗数据,添加时间维度,补充码表信息等。

在spark_function中,我们详细说明了数据处理的逻辑。这包括读取媒体浏览日志数据、进行IP地址转换、添加时间维度、补充码表信息、数据清洗和最终写入HDFS等步骤。

1.数据读取

首先,我们使用PySpark的read方法从HDFS中读取媒体浏览日志数据。我们指定了数据的schema,以确保正确地解析每一列。

df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))

2.IP地址转换

接下来,我们通过iptranslate函数将IP地址转换为地理位置信息。这使用了XdbSearcher类,该类负责读取xdb文件并执行IP地址的二分查找。

# 根据IP地址获取地点信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')

3.时间维度添加

我们生成当前时间的时间戳,并添加各种时间格式的列,包括年、季度、月、日、小时等。

# 生成当前时间的时间戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各种时间格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')

4.数据清洗

最后,我们对数据进行清洗,包括将空值替换为默认值、字符串去除空格、数据类型转换等。

# 数据清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...

5.最终写入HDFS

最终,我们将处理后的数据写入HDFS,采用分区方式存储,以便更高效地管理和查询。

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")

通过这一系列步骤,我们完成了对媒体浏览日志数据的全面处理,包括数据转换、地理位置信息的添加、时间维度的补充和数据清洗等关键步骤。

三、结论

通过详细的实现步骤,深入解析了基于Spark的媒体浏览日志ETL任务的构建过程。这个任务可以根据具体需求进行调整和扩展,为大规模数据处理任务提供了一种高效而灵活的解决方案。



Tags:PySpark   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
基于PySpark SQL的媒体浏览日志ETL作业
pyspark除了官方的文档,网上的教程资料一直很少,但基于调度平台下,使用pyspark编写代码非常高效,程序本身是提交到spark集群中,性能上也是毫无问题的,在本文中,我们将深入探讨基于S...【详细内容】
2023-11-27  Search: PySpark  点击:(198)  评论:(0)  加入收藏
手把手教你Python大数据分析:使用 PySpark 分析 Excel 文件
Apache Spark是一个用于大规模数据分析处理的引擎。它支持Java、Scala、Python和R语言。在数据分析人工智能领域 Python的使用已经远超其它语言。其中Spark还支持一组丰富的...【详细内容】
2020-06-28  Search: PySpark  点击:(712)  评论:(0)  加入收藏
▌简易百科推荐
Python 可视化:Plotly 库使用基础
当使用 Plotly 进行数据可视化时,我们可以通过以下示例展示多种绘图方法,每个示例都会有详细的注释和说明。1.创建折线图import plotly.graph_objects as go# 示例1: 创建简单...【详细内容】
2024-04-01  Python技术    Tags:Python   点击:(8)  评论:(0)  加入收藏
Python 办公神器:教你使用 Python 批量制作 PPT
介绍本文将介绍如何使用openpyxl和pptx库来批量制作PPT奖状。本文假设你已经安装了python和这两个库。本文的场景是:一名基层人员,要给一次比赛活动获奖的500名选手制作奖状,并...【详细内容】
2024-03-26  Python技术  微信公众号  Tags:Python   点击:(15)  评论:(0)  加入收藏
Python实现工厂模式、抽象工厂,单例模式
工厂模式是一种常见的设计模式,它可以帮助我们创建对象的过程更加灵活和可扩展。在Python中,我们可以使用函数和类来实现工厂模式。一、Python中实现工厂模式工厂模式是一种常...【详细内容】
2024-03-07  Python都知道  微信公众号  Tags:Python   点击:(31)  评论:(0)  加入收藏
不可不学的Python技巧:字典推导式使用全攻略
Python的字典推导式是一种优雅而强大的工具,用于创建字典(dict)。这种方法不仅代码更加简洁,而且执行效率高。无论你是Python新手还是有经验的开发者,掌握字典推导式都将是你技能...【详细内容】
2024-02-22  子午Python  微信公众号  Tags:Python技巧   点击:(32)  评论:(0)  加入收藏
如何进行Python代码的代码重构和优化?
Python是一种高级编程语言,它具有简洁、易于理解和易于维护的特点。然而,代码重构和优化对于保持代码质量和性能至关重要。什么是代码重构?代码重构是指在不改变代码外部行为的...【详细内容】
2024-02-22  编程技术汇    Tags:Python代码   点击:(32)  评论:(0)  加入收藏
Python开发者必备的八个PyCharm插件
在编写代码的过程中,括号几乎无处不在,以至于有时我们会拼命辨别哪个闭合括号与哪个开头的括号相匹配。这款插件能帮助解决这个众所周知的问题。前言在PyCharm中浏览插件列表...【详细内容】
2024-01-26  Python学研大本营  微信公众号  Tags:PyCharm插件   点击:(84)  评论:(0)  加入收藏
Python的Graphlib库,再也不用手敲图结构了
Python中的graphlib库是一个功能强大且易于使用的工具。graphlib提供了许多功能,可以帮助您创建、操作和分析图形对象。本文将介绍graphlib库的主要用法,并提供一些示例代码和...【详细内容】
2024-01-26  科学随想录  微信公众号  Tags:Graphlib库   点击:(86)  评论:(0)  加入收藏
Python分布式爬虫打造搜索引擎
简单分布式爬虫结构主从模式是指由一台主机作为控制节点负责所有运行网络爬虫的主机进行管理,爬虫只需要从控制节点那里接收任务,并把新生成任务提交给控制节点就可以了,在这个...【详细内容】
2024-01-25  大雷家吃饭    Tags:Python   点击:(58)  评论:(0)  加入收藏
使用Python进行数据分析,需要哪些步骤?
Python是一门动态的、面向对象的脚本语言,同时也是一门简约,通俗易懂的编程语言。Python入门简单,代码可读性强,一段好的Python代码,阅读起来像是在读一篇外语文章。Python这种特...【详细内容】
2024-01-15  程序员不二    Tags:Python   点击:(161)  评论:(0)  加入收藏
Python语言的特点及应用场景, 同其它语言对比优势
Python语言作为一种高级编程语言,具有许多独特的特点和优势,这使得它在众多编程语言中脱颖而出。在本文中,我们将探讨Python语言的特点、应用场景以及与其他语言的对比优势。一...【详细内容】
2024-01-09    今日头条  Tags:Python语言   点击:(251)  评论:(0)  加入收藏
站内最新
站内热门
站内头条