云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

网站服务器_网站服务器试用_免费申请

小七 141 0

自适应查询执行:在运行时加速Spark SQL

这是Databricks Apache Spark工程团队(范文晨、赫尔曼·范·霍维尔和MaryAnn Xue)与英特尔工程团队(Ke Jia、Haifeng Chen和Carson Wang)的联合工程。请参阅AQE笔记本,演示下面介绍的解决方案多年来,为了生成高质量的查询执行计划,Spark SQL的查询优化器和规划器得到了广泛而持续的改进。最大的改进之一是基于成本的优化框架,它收集并利用各种数据统计(例如,行数、不同值的数量、空值、最大/最小值等),以帮助Spark选择更好的计划。这些基于成本的优化技术的例子包括选择正确的连接类型(广播散列连接与排序合并连接),在哈希连接中选择正确的构建端,或者在多路连接中调整连接顺序。然而,过时的统计数据和不完美的基数估计可能会导致次优的查询计划。AdaptiveQueryExecution是即将发布的ApacheSparktm 3.0中的新功能,在Databricks Runtime 7.0中提供,现在它希望通过根据查询执行过程中收集的运行时统计数据重新优化和调整查询计划来解决此类问题。自适应查询执行框架自适应查询执行最重要的问题之一是何时重新优化。Spark操作符通常是管道化的,并在并行进程中执行。然而,洗牌或广播交换破坏了这条管道。我们称它们为物化点,并使用术语"查询阶段"来表示查询中由这些物化点限定的子部分。每个查询阶段具体化其中间结果,并且只有在运行物化的所有并行进程都已完成时,下一阶段才能继续。这为重新优化提供了一个自然的机会,因为当所有分区上的数据统计信息可用且连续操作尚未开始时。当查询启动时,AdaptiveQueryExecutionFramework首先启动所有的叶阶段,这些阶段不依赖于任何其他阶段。一旦这些阶段中的一个或多个完成物化,框架就会在物理查询计划中标记它们的完成,并相应地更新逻辑查询计划,并使用从完成的阶段检索的运行时统计信息。基于这些新的统计信息,框架然后运行优化器(带有一个选定的逻辑优化规则列表)、物理规划器以及物理优化规则,其中包括常规物理规则和自适应执行特定规则,如合并分区、倾斜连接处理等自适应执行框架是一个新优化的查询计划,它将搜索和执行子阶段都已物化的新查询阶段,并重复上述执行-再优化执行过程,直到整个查询完成。在Spark 3.0中,AQE框架附带三个特性:动态合并无序分区动态切换连接策略动态优化倾斜连接下面几节将详细讨论这三个特性。动态合并无序分区在Spark中运行查询以处理非常大的数据时,shuffle通常对查询性能有非常重要的影响。Shuffle是一个昂贵的运营商,因为它需要在网络上移动数据,这样数据就可以按照下游运营商的要求进行重新分配。shuffle的一个关键属性是分区的数量。分区的最佳数量取决于数据,但数据大小可能因阶段而异,查询也不同,因此很难调整:如果分区太少,则每个分区的数据大小可能非常大,处理这些大分区的任务可能需要将数据溢出到磁盘(例如,当涉及排序或聚合时),从而降低查询速度。如果有太多的分区,那么每个分区的数据量可能非常小,并且会有很多小的网络数据取数来读取洗牌块,这也会因为I/O模式效率低下而减慢查询速度。拥有大量的任务也会给Spark任务调度器带来更多的负担。为了解决这个问题,我们可以在开始时设置相对较多的shuffle分区,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并成更大的分区。例如,假设我们正在运行查询selectmax(i)FROM tbl GROUP BY j。输入数据tbl非常小,因此在分组之前只有两个分区。初始的shuffle分区号被设置为5,因此在本地分组之后,部分分组的数据被随机分为5个分区。如果没有AQE,Spark将启动五个任务来做最后的聚合。但是,这里有三个非常小的分区,为每个分区分别启动一个任务将是浪费。相反,AQE将这三个小分区合并为一个,因此,最终的聚合现在只需要执行三个任务而不是五个任务。动态切换连接策略Spark支持许多连接策略,其中广播哈希连接通常是性能最好的,如果连接的一方可以很好地适应内存。因此,如果连接关系的估计大小小于广播大小阈值,Spark将计划广播散列连接。但是有很多事情会使这种大小估计出错-例如存在一个非常有选择性的过滤器-或者连接关系是一系列复杂的运算符,而不仅仅是一个扫描。为了解决这个问题,AQE现在根据最精确的连接关系大小在运行时重新规划连接策略。在下面的例子中可以看到,连接的右侧被发现比估计值小得多,并且小到可以被广播,所以在AQE重新优化之后,静态计划的排序合并连接现在被转换为广播散列连接。对于运行时转换的广播散列连接,我们可以进一步将常规的shuffle优化为本地化的shuffle(即按mapper读取而不是按reducer读取)以减少网络流量。动态优化倾斜连接当数据在集群中的分区之间分布不均匀时,就会发生数据倾斜。严重倾斜会显著降低查询性能,尤其是在连接时。AQE-skew-join优化从shuffle文件统计信息中自动检测这种倾斜。然后它将倾斜的分区分割成更小的子分区,这些子分区将分别从另一侧连接到相应的分区。让我们以表A连接表B为例,其中表A的分区A0比其他分区大得多。因此,倾斜连接优化将分区A0分成两个子分区,并将每个分区连接到表B的相应分区B0。如果没有这种优化,将有四个任务运行sort-merge-join,其中一个任务需要更长的时间。经过此优化后,将有五个任务运行联接,但每个任务所需的时间大致相同,因此总体性能更好。来自AQE的TPC-DS性能增益在我们使用TPC-DS数据和查询的实验中,自适应查询执行的查询性能提高了8倍,32个查询的速度提高了1.1倍以上。下面是AQE对10个TPC-DS查询性能改进最快的图表。这些改进大多来自于动态分区合并和动态连接策略切换,因为随机生成的TPC-DS数据没有倾斜。然而,我们已经看到,在利用了AQE的所有三个特性的情况下,生产工作量有了更大的改进。启用AQE可以通过设置SQL配置来启用AQEspark.sql.adaptive.enabled为true(在Spark 3.0中默认为false),并且在查询满足以下条件时应用:它不是流式查询它至少包含一个交换(通常是在有联接、聚合或窗口运算符时)或一个子查询通过减少查询优化对静态统计的依赖,AQE解决了基于Spark代价的优化的最大难题之一,即统计信息收集开销和估计精度之间的平衡。为了获得最佳的估计精度和规划结果,通常需要维护详细的、最新的统计数据,其中一些统计数据的收集成本很高,例如可以用来提高选择性和基数估计或检测数据偏差的列直方图。AQE基本上消除了对此类统计数据以及手动调优工作的需要。除此之外,AQE还使SQL查询优化对任意UDF和不可预测的数据集更改(例如,数据大小的突然增加或减少、频繁和随机的数据倾斜等)的存在更具弹性。不再需要提前"知道"您的数据。AQE将在查询运行时找出数据并改进查询计划,从而提高查询性能,以实现更快的分析和系统性能。在我们的预览网络研讨会上了解更多关于Spark 3.0的信息。作为Databricks Runtime 7.0的一部分,请立即在Databricks上免费试用Spark 3.0中的AQE。免费试用Databricks。今天就开始吧