云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

网站空间_郑州建设网站公司_免费6个月

小七 141 0

使用pythonapi对Delta-Lake表执行简单、可靠的upsert和delete操作

试试这个Jupyter笔记本我们很高兴地宣布deltalake0.4.0的发布,它引入了pythonapi来处理和管理Delta表中的数据。此版本的主要功能包括:用于DML和实用程序操作的PythonAPI(#89)–您现在可以使用PythonAPI更新/删除/合并Delta Lake表中的数据,并在这些表上运行实用程序操作(即真空、历史记录)。这些对于在Python中构建复杂的工作负载非常有用,例如,缓慢变化的维度(SCD)操作、合并用于复制的更改数据以及流式查询的上置。有关详细信息,请参阅文档。Convert to Delta(#78)–现在可以将拼花地板表就地转换为Delta Lake表,而无需重写任何数据。这对于转换非常大的拼花板表非常有用,而将其重写为Delta表将非常昂贵。此外,这个过程是可逆的——你可以把拼花桌转换成Delta Lake桌子,对其进行操作(例如,删除或合并),并轻松地将其转换回拼花桌。有关详细信息,请参阅文档。用于实用程序操作的SQL—现在可以使用SQL运行实用程序操作真空和历史。有关如何配置Spark以执行这些Delta特定SQL命令的更多详细信息,请参阅文档。有关更多信息,请参阅Delta Lake 0.4.0发行说明和Delta Lake文档>表删除、更新和合并。在这个博客中,我们将在apachespark上演示™ 2.4.3如何在航班准时性能场景下使用Delta Lake 0.4.0中的Python和新的Python API,我们将展示如何更新和删除数据,如何使用时间旅行查询旧版本的数据,以及如何清空旧版本进行清理。如何开始使用三角洲湖Delta-Lake包和--packages选项一样可用。在我们的例子中,我们还将演示在Apache Spark中清空文件和执行Delta Lake SQL命令的能力spark.databricks.delta.回复tensionDurationCheck.enabled=错误使我们能够抽真空文件的时间比默认的保留期7天短。注意,这仅对SQL命令vacuum是必需的。spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension在apachespark中启用deltalakesql命令;Python或scalaapi调用不需要这样做。#使用Spark软件包./bin/pyspark—包io.增量:delta-core_2.11:0.4.0--conf"spark.databricks.delta.回复tensionDurationCheck.enabled=错误"--配置"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"加载并保存我们的三角洲湖数据该场景将使用RITA BTS航班离港统计数据生成的准时航班性能或离港延迟数据集;该数据的一些实际应用示例包括通过d3.js Crossfilter获得的2014年航班离港性能和Apache Spark的GraphFrames的On-time flight performance™.这个数据集可以从这个github位置在本地下载。#位置变量tripdelaysFilePath="/root/data"/部门延迟.csv"pathToEventsTable="/root/deltalake"/出发延迟.delta"#读取航班延误数据发车延误=火花。阅读\.option("header","true")\.option("inferSchema","true")\.csv(tripdelaysFilePath)接下来,我们将departureDelays数据集保存到一个Delta-Lake表中,通过将这个表保存到Delta-Lake存储中,我们将能够利用它的特性,包括ACID事务、统一批处理和流式处理以及时间旅行。#将航班延误数据保存为Delta Lake格式发车延误\.写入\.格式("delta")\.mode("覆盖")\。保存("出发延迟.delta")注意,这种方法类似于通常保存拼花地板数据的方式;现在不再指定格式("Parquet"),而是指定格式("delta")。如果要查看底层文件系统,您会注意到为departureDelays Delta-Lake表创建的四个文件。/出发延迟.delta$ls-l美元..._三角洲测井第00000部分-df6f69ea-E6A-424b-bc0e-f3674c4f1906-c000。拼花地板零件号00001-711BCE3-fe9e-466e-a22c-8256f8b54930-c000。拼花地板零件号00002-778ba97d-89b8-4942-a495-5f6238830b68-c000。拼花地板零件号00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000。拼花地板注意,\u delta_log是包含delta Lake事务日志的文件夹。有关更多信息,请参阅Diving Into delta Lake:Unpacking the transaction log。现在,让我们重新加载数据,但这次我们的数据帧将由Delta Lake支持。#加载Delta Lake格式的航班延误数据延迟_delta=火花\.阅读\.格式("delta")\.负载("出发延迟.delta")#创建临时视图延误_delta.createOrReplaceTempView("延迟三角洲")#西雅图和旧金山之间有多少航班火花.sql("从delta中选择count(1),其中origin='SEA',destination='SFO')。show()最后,让我们确定从西雅图到旧金山的航班数量;在这个数据集中,有1698个航班。就地转换为三角洲湖如果你有现有的拼花地板表,你有能力执行就地转换你的表到三角洲湖,因此不需要重写你的表。要转换表,可以运行以下命令。从增量表进口*#转换路径"/path/to/table"处的非分区拼花地板表可删除=DeltaTable.convertToDelta(spark,"parquet.`/path/to/table`")#转换路径为"/path/to/table"并按名为"part"的整数列分区的拼花地板表分区数据表=DeltaTable.convertToDelta(spark,"parquet.`/path/to/table`",part int)有关更多信息,包括如何在Scala和SQL中进行转换,请参阅Convert to Delta Lake。删除我们的航班数据要从传统的data Lake表中删除数据,您需要:从表中选择不包括要删除的行的所有数据基于上一个查询创建新表删除原始表将新表重命名为下游依赖项的原始表名。我们可以通过运行DELETE语句来简化这个过程,而不是使用Delta-Lake执行所有这些步骤。从增量表进口*从pyspark.sql.functions进口*#访问Delta Lake表可删除=可删除的.forPath(火花,pathToEventsTable)#删除所有早班航班可删除。删除("延迟