> Source: Pixabay
Apache Spark是一种开放源代码的分布式计算引擎,目前是用于内存中批处理驱动的数据处理的最受欢迎的框架(它还支持实时数据流传输)。 得益于其先进的查询优化器,DAG调度程序和执行引擎,Spark能够非常高效地处理和分析大型数据集。 但是,在没有仔细调整的情况下运行Spark作业仍会导致性能下降。
在博客文章中,我将分享一些Spark性能调优的技巧,以帮助您解决和加快运行缓慢的Spark作业。
(本文提到的所有功能均来自PySpark,您可以使用Spark API文档找到与Scala / JAVA等效的功能。)
当数据集最初由Spark加载并成为弹性分布式数据集(RDD)时,所有数据均匀地分布在分区之间。 但是,在用户对其应用某些类型的数据操作之后,这些分区可能会变得不均匀。 例如,groupByKey操作可能导致分区偏斜,因为一个键可能比另一个键包含更多的记录。 此外,由于Spark的DataFrameWriter允许使用partitionBy将分区数据写入磁盘,因此磁盘上的分区也可能不均匀。
在DataFrame中重新平衡偏斜的分区将极大地提高Spark在DataFrame上的处理性能。 您可以使用getNumPartitions函数检查DataFrame中的分区数,并通过运行简单的Spark作业来查找每个分区中的记录数,例如:
from pyspark.sql.functions import spark_partition_id
df.withColumn("partition_id", spark_partition_id())
.groupBy("partition_id")
.count()
.show()
如果发现DataFrame的分区大小高度不均匀,请在对它进行任何分析之前,使用重新分区或合并函数对DataFrame进行重新分区。 还建议在将数据写回磁盘之前,先对内存中的数据进行分区。 RDD模块也支持这些重新分区功能。
由于惰性执行原理,除非用户明确调用操作来收集结果,否则Spark不会对数据集执行任何实际的转换。 此外,如果用户希望对中间结果应用其他转换,Spark将需要从头开始重新计算所有内容。 为了允许用户更有效地重用日期,Spark可以使用持久性或缓存功能将数据缓存在内存和/或磁盘中。
但是,缓存并不总是一个好主意。 Spark缓存数据集后,Catalyst优化器优化进一步转换的能力将受到限制,因为它不再能够改善源数据级别的修剪。 例如,如果将过滤器应用于在源数据库中建立索引的列,则Catalyst将无法利用索引来提高性能。
因此,仅当缓存数据将在以后多次重用时才建议使用缓存数据。 迭代探索数据集或调整ML模型时。
> Source: Pixabay
基于成本的优化器(CBO)通过向Catalyst提供其他表级统计信息,可以加快Spark SQL作业的速度,这对于连接许多数据集的作业特别有用。 使用者可以通过将spark.sql.cbo.enabled设置为true(默认值)来启用CBO。
为了充分利用CBO,用户需要保持列级和表级统计信息都是最新的,从而使CBO可以使用准确的估算来优化查询计划。 为此,在对表运行SQL查询之前,请使用ANALYZE TABLE命令收集统计信息。 记住在修改表之后再次分析表,以确保统计信息是最新的。
除了启用CBO,在Spark中优化连接数据集的另一种方法是使用广播联接。 在无序连接中,两个表中的记录都将通过网络传输给执行器,当一个表比另一个大得多时,这是次优的。 在广播联接中,较小的表将被发送给执行程序,以与较大的表联接,从而避免了通过网络发送大量数据的情况。
用户可以通过spark.sql.autoBroadcastJoinThreshold配置控制广播联接,指示要广播的表的最大大小。 此外,即使表的大小大于spark.sql.autoBroadcastJoinThreshold,也可以使用广播提示来告诉Spark广播表:
from pyspark.sql.functions import broadcast
broadcast(spark.table("tbl_a")).join(spark.table("tbl_b"), "key")
由于所有Spark作业都占用大量内存,因此确保有效进行垃圾收集非常重要-我们希望产生较少的内存"垃圾"以减少GC时间。 要了解您的Spark作业是否在GC中花费过多时间,请在Spark UI中检查"任务反序列化时间"和" GC时间"。
例如,由于Spark需要反序列化更多对象,因此使用用户定义函数(UDF)和lambda函数将导致更长的GC时间。 还建议避免创建中间对象并将不必要的RDD缓存到JVM堆。
TL; DR:
· 使用重新分区或合并来重新平衡不均匀的分区。
· 仅当数据将被多次重用时才保留数据。
· 使用ANALYZE TABLE命令可以维护CBO的最新统计信息。
· 为小表启用广播连接以加快连接速度。
· 通过使用较少的UDF并避免缓存大对象来优化GC。
(本文翻译自Xinran Waibel的文章《Apache Spark Optimization Toolkit》,参考:https://towardsdatascience.com/apache-spark-optimization-toolkit-17cf3e491992)