云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

数据库_自己搭建云存储_测评

小七 141 0

用apachespark优化用户定义函数™ 现实世界中的R:明尼苏达双胞胎的比例音高分析第2部分

介绍在第一部分中,我们讨论了明尼苏达双城棒球队如何在1500万次历史投球中进行高达2万次的模拟——总共3000亿次模拟——来更准确地评估球员的表现,每个球员分配的3000亿次模拟投球将使这一形象更加鲜明,并提供更可靠的估值。这些数据将影响教练和人事决策,以期为俱乐部创造更多的胜利,进而为俱乐部带来收入。所有用于生成和评分数据的脚本和机器学习模型都是用R编写的。即使在R中使用多线程包运行这些脚本,他们估计处理所有模拟也需要3.8年的时间。使用Apache Spark和Databricks中的用户定义函数(UDF),我们能够减少数据处理对历史数据集进行3000亿次模拟需要2-3天的时间,游戏内数据的模拟时间接近实时。通过对比赛中的投球进行近乎实时的得分,Twins希望最终根据比赛条件优化阵容和策略决策,例如,根据击球手、天气、局和投手最后一次投球的速度+旋转读数选择最佳投手和投球。通过将R包的巨大生态系统与Spark的可伸缩性和并行性相结合,udf不仅在体育领域,而且在跨行业的用例中都非常强大使用prophet等时间序列包生成数千种消费品的销售预测模拟数百种金融投资组合的表现车队运输计划的模拟通过并行搜索数千个超参数来寻找最佳模型尽管这些应用程序既令人兴奋又诱人,但它们的威力是有代价的。问问任何尝试过的人,他们都会告诉你,实现一个可以优雅地扩展的UDF是相当具有挑战性的。这是因为需要有效地管理集群上的核心和内存,以及它们之间的紧张关系,关键是要以Spark能够线性扩展的方式来构建作业。在这篇文章中,我们将开始一段旅程,了解如何制作出具有规模的udf,成功取决于对存储、Spark、R以及它们之间的交互作用的理解。使用Spark和R了解udf一般来说,当你使用SparkR或sparklyr时,你的R代码会被翻译成Scala,Spark的母语。在这种情况下,R进程仅限于Spark集群的驱动程序节点,而集群的其余部分则在Scala中完成任务。但是,用户定义的函数提供了对每个worker上的R进程的访问,从而允许你在返回结果之前,将R函数并行地应用于Spark数据帧的每个组或分区。Spark是如何协调这一切的?您可以在下图中清楚地看到控制流。作为每个任务的一部分,Spark将在每个worker上创建一个临时R会话,序列化R闭包,然后在集群中分发UDF。当每个worker上的R会话处于活动状态时,可以在UDF内部利用R生态系统的全部功能。当R代码完成执行时,会话将终止,并将结果发回你可以通过观看这里和这篇博文中的谈话来了解更多。正确使用UDF"……如果你一开始就避免做这项工作,你将是最快的。"[1]既然我们已经了解了UDF的执行原理,那么让我们更深入地了解系统中的潜在瓶颈以及如何消除这些瓶颈。在编写这些函数时,基本上需要了解四个关键领域:数据源Spark中的数据传输Spark和R之间的数据传输R工艺1数据源:最小化存储I/O第一步是计划如何在存储中组织数据,许多R用户可能习惯于使用平面文件,但关键原则是只摄取UDF正确执行所需的内容。你的工作很大一部分是进出存储的I/O操作,如果你的数据当前是未优化的文件格式(如CSV),Spark可能需要将整个数据集读入内存。这可能会非常缓慢和低效,尤其是当你不需要该文件的所有内容时。因此,我们建议将数据保存为可扩展格式,如Delta Lake。Delta通过在存储中对数据进行分区、优化这些分区的大小以及使用Z顺序创建辅助索引来加快对Spark的接收。这些功能加在一起,有助于限制在UDF中需要访问的数据量。怎么会这样?想象一下,我们在存储中按投球类型分区棒球数据,并引导Spark读取投球类型等于"curveball"的行。使用Delta,我们可以跳过对其他投球类型的所有行的摄取。这种数据扫描的减少可以极大地加快读取速度–如果只有10%的数据包含curveball投球,那么可以有效地跳过90%的数据集读取到内存中!通过使用类似Delta Link的存储层和对应于UDF处理什么数据的分区策略,您将奠定了坚实的基础并消除了潜在的瓶颈。2a.Spark中的数据传输:优化内存中的分区大小内存中分区的大小会影响功能工程/ETL管道的性能,从而导致并包括UDF本身。一般来说,每当Spark必须执行诸如join或groupby之类的广泛转换时,数据必须在群集。Theshuffle分区数的默认设置是任意设置为200,这意味着在执行shuffle操作时,Spark DataFrame中的数据分布在200个分区中。根据数据的大小,这可能会导致效率低下。如果数据集较小,则200个分区可能会过度并行化工作,从而导致不必要的调度开销和其中数据很少的任务。如果数据集很大,则可能是并行化不足,无法有效地使用群集中的资源。用大小为128MB的缓冲区来划分内存大小,以保证内存大小最大化为您的作业设置最佳分区数spark.sql.shuffle.SparkR中的分区配置如下:sparkR.会议(sparkConfig=列表(spark.sql.shuffle.partitions="400"))主动重新分区Spark数据帧也会受到此设置的影响,因为它需要一个随机操作。我们将看到,此行为可用于管理系统其他部分的内存压力,如垃圾收集和Spark与R之间的数据传输。2b.Spark中的数据传输:垃圾收集和集群大小调整当您遇到大数据问题时,您可能会倾向于采用暴力手段,并寻求最大的工人类型,但解决方案可能并不那么简单。当内存中有不再使用的大型对象时,Java虚拟机(JVM)中的垃圾收集往往会失控。使用非常大的工作线程可能会加剧此问题,因为首先有更大的空间来创建大对象。管理对象的大小因此,内存是解决方案体系结构的一个关键考虑因素。对于这个特定的工作,我们发现一些大工人或许多小工人的表现不如许多中型工人,大工人会产生过多的垃圾收集,导致作业无限期挂起,而小工人则会耗尽内存。为了解决这个问题,我们逐渐增加了工作线程的大小,直到我们到达了JVM垃圾收集可以正常处理的RAM和CPU的中间范围。我们还将输入Spark数据帧重新分区到我们的UDF中,并增加了它的分区。这两种措施都有效地管理了JVM中的对象大小,并有助于保持对于每个Spark执行器,垃圾收集不到总任务时间的10%。如果我们想获得更多的记录,我们可以简单地在集群中添加更多的中型工作者,并以线性方式增加输入数据帧的分区。三。Spark和R之间的数据传输下一步要考虑的是如何在Spark和R之间传递数据。这里我们确定了两个潜在的瓶颈:总体I/O和进程之间发生的相应(反)序列化。首先,只输入UDF正确执行所需的内容。与我们如何从存储中优化I/O读取类似,过滤输入Spark DataFrame以仅包含UDF所需的那些列。如果我们的Spark DataFrame有30列,而UDF只需要其中的4列,则相应地将数据子集并将其用作输入,这将通过减少I/O和相关(反)序列化来加快执行速度。如果您对输入数据进行了适当的子集划分,但仍然存在内存不足的问题,那么重新分区可以帮助控制在Spark和R之间传输的数据量。例如,如果使用SparkR的`repartition()`函数将每个任务的分区数增加到200个,那么每个任务中都会有1GB的数据被发送到R,而更多分区的折衷是JVM和R之间更多的(反)序列化任务,但是每个任务中的数据和后续内存压力较小。您可能认为一个典型的14gbramspark worker可以处理2GB的数据分区,但实际上,如果您想避免内存不足的错误,您至少需要30GB的RAM!对于许多试图在Spark和R中开始使用udf的开发人员来说,这可能是一个严重的觉醒