Spark3 针对SQL有哪些优化手段
相对于 Spark 2.3/2.4,Spark 3 提供了一些新特性,极大优化了超大数据量的查询。主要有三点:
- 引入 Adaptive Query Execution 自适应执行,解决了热点数据倾斜的问题
- 支持 Hint 提示,增加了SQL的可操控性
- 增加了一些 set 参数,增加SQL的可操控性
Adaptive Query Execution 默认配置
Spark 3 默认启用AQE,启用以后Spark会根据运行时的统计信息,动态优化查询计划。
set spark.sql.adaptive.enabled=true;
针对数据倾斜,Spark3 默认启用了 AQE,并预设置了参数:
set spark.sql.adaptive.skewJoin.enabled=true;
set spark.sql.adaptive.skewJoin.skewedPartitionFactor=5;
set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB;
一个分区被判定为数据倾斜,必须满足两个条件:
- 分区数据的大小超过 skewedPartitionThresholdInBytes,默认256M
- 分区数据的大小超过 所有分区大小的中位数*skewedPartitionFactor。
假如一个stage shuffle以后有200个分区,分区大小中位数是 128M,skewedPartitionFactor=5,那么超过640MB的分区会被判定为数据倾斜。
对于倾斜的分区进行拆分时,每个分区的大小接近 advisoryPartitionSizeInBytes。
在TB数据量级的Hive表上查询时,如果Yarn集群比较大,skewedPartitionThresholdInBytes 和 advisoryPartitionSizeInBytes 默认值就有点小了,可以根据情况自己情况调大。但skewedPartitionThresholdInBytes 理论上应该比 advisoryPartitionSizeInBytes 大。
利用AEQ 合并分区
在 JOIN 或 Aggregate 以后,shuffle结果的分区可能会特别多,导致下一个Stage的任务数特别多,或者Hive表包含了太多的小文件。Spark默认启用了 coalescePartitions 功能用来优化这种情况:
set spark.sql.adaptive.coalescePartitions.enabled=true;
set spark.sql.adaptive.coalescePartitions.parallelismFirst=true;
set spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB;
这个参数有两种工作模式:
高并发模式:coalescePartitions.parallelismFirst=true 时,通过 coalescePartitions.minPartitionSize 判断一个 partition 是否需要合并;
低并发模式:coalescePartitions.parallelismFirst=false 时,通过前面提到的 advisoryPartitionSizeInBytes=64MB 判断一个partition 是否需要合并;
可以看到,Spark默认选择了高并发模式,带来了结果是 shuffle后任务 (Task) 过多,如果每个任务都处理极少的数据量,调度带来的性能损耗会大于计算带来的效率增益。
由于这个参数是全局生效的,所有的 shuffle 里都会生效。在实际使用中,可以调高 coalescePartitions.minPartitionSize,以取得比较好的平衡。或者使用 Spark hints 对特定的环节进行合并。
Spark Hints
Spark Dataset API 提供了很多方法来精细化控制每个环节的执行,比如 repartition、coalesce、broadcast 等,SQL 基础的语法框架并不支持这些。在不改变 SQL 基本语法的前提下,Spark 引入了 Hint (提示)。 我们可以通过类似于注释的语法,实现对特定环节的精细化控制。
场景一:控制 Join 模式
从性能角度 Broadcast Join > Shuffle Hash Join > SortMerge Join。与TB级的表 JOIN 时,我们尽量用空间换时间,选择Broadcast Join。通常情况下,spark.sql.autoBroadcastJoinThreshold 默认值是10MB,我们可以调大到512MB:
set spark.sql.autoBroadcastJoinThreshold=536870912;
10MB 可以容纳 131万个 BIGINT,但涉及到的字段比较多时,支持的数据条数会剧烈下降。增加这个阈值,上千万级别的大表都可以被 Broadcast,而TB级的表不需要做shuffle,这个空间损耗远远小于 TB级大表做Shuffle带来的时间和空间损耗。
在个别情况下,比如对表大小的统计不准确,autoBroadcastJoinThreshold 无法生效的情况,可以使用 spark hint 强制走 broadcast:
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
好的一点是:Broadcast hint 的优先级要高于autoBroadcastJoinThreshold参数。
对于JOIN 两侧表都比较大的情况,也可以通过 hint 触发 shuffle hash join:
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
场景二:Partitioning Hints 分区提示
分区提示支持四种模式:
- coalesce:合并分区,但不触发shuffle
- repartition:允许按照特定字段合并/拆分分区,触发shuffle
- repartition_by_range:和repartition类似
- rebalance:合并/拆分分区,但会保持输出分区大小尽量一致,倾斜的分区会被拆开
在写入 Hive表时,选择rebalance 模式能够比较好地保持输出文件大小的均衡。
SELECT /*+ COALESCE(100) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
其他常用的优化参数
场景一:Hive 表小文件读取优化
Spark 在读取大文件时,如果文件大小超过 hdfs block 大小,会默认拆分到多个partition里。如果是小文件,则会每个小文件对应一个Task。如果Task数量超过10w,Spark集群的性能会出现严重的衰减,绝大多数情况下是小文件问题:
对于小文件问题,如果上游能优化一下,比如通过 repartition hints 或 coalescePartitions 参数等,下游一般都不用考虑这个问题。如果上游解决不了,下游还可以通过 maxPartitionBytes 将多个文件合并到同一个分区下:
set spark.sql.files.maxPartitionBytes=134217728;
值得注意的是:这个参数只对 parquet、orc 和 json 格式的表生效,对 text 格式的hive表无效。
场景二:控制 shuffle 输出的分区数
set spark.sql.shuffle.partitions=200;
这是一个全局参数,如果hive表的数据量比较大,可以调大这个参数。否则可能会出现 executor oom 的情况。当然另外一种方式是 增大 executor memory:
set spark.executor.memory=16g;
本期的 SQL 优化就到这里,想要查询更多资料的话:
- Spark hints: Google 搜索 "Spark 3.2.1 hint"
- Spark性能优化: Google 搜索 "Performance Tuning - Spark 3.2.1 Documentation"
- 更多set 参数:参考 github apache/spark 下 sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala