云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

分布式数据库_分布式存储设备_企业级

小七 141 0

从Presto和Athena查询Delta-Lake表,提高操作并发性和合并性能

我们很高兴地宣布DeltaLake0.5.0的发布,它引入了Presto/Athena支持和改进的并发性。此版本的主要功能包括:支持使用清单文件的其他处理引擎(#76)–现在可以使用清单文件从Presto和Amazon Athena查询Delta表,可以使用Scala、Java、Python和sqlapi生成这些表。有关详细信息,请参阅Presto和Athena到Delta Lake的集成文档改进了所有Delta Lake操作的并发性(#9,#72,#228)–您现在可以同时运行更多的Delta Lake操作。Delta-Lake的乐观并发控制通过使冲突检测更细粒度而得到改进。这使得在Delta表上运行复杂的工作流变得更加容易。例如:在旧分区上同时运行删除(例如为了符合GDPR),同时追加新的分区。在不相交的分区集上并发运行更新和合并。同时运行文件压缩和追加(见下文)。有关更多信息,请参考开源deltalake0.5.0发行说明。在这篇博文中,我们将详细介绍使用Presto读取Delta-Lake表、改进的操作并发性、使用仅插入合并(insert-only merge)更容易、更快地消除重复数据。用Presto读取三角洲湖表正如使用Python api在Delta-Lake表上简单可靠的Upserts和Deletes中所述,对数据的修改(如删除)是通过有选择地编写包含要删除的数据的文件的新版本来执行的,并且只将以前的文件标记为已删除。这种方法的优点是Delta-Lake使我们能够回到时间(即时间旅行)和查询以前的版本。为了了解哪些文件(和行)包含最新的数据,默认情况下可以查询事务日志(更多信息请参阅Diving Into Delta Lake:Unpacking the transaction log)。其他系统,比如Presto和Athena可以读取生成的清单文件(manifest file)——一个文本文件,其中包含查询表时要读取的数据文件列表。为此,我们将遵循Python指令;有关更多信息,请参阅设置Presto或Athena到Delta-Lake的集成和查询Delta表。生成Delta Lake清单文件让我们用下面的代码片段创建Delta Lake清单文件。可删除=可删除的.forPath(路径可伸缩)deltaTable.生成("符号链接格式"清单)顾名思义,这将在表根文件夹中生成清单文件。如果您已经使用Python api在Delta Lake表上创建了departuredelayes表,那么在表根文件夹中会有一个新文件夹:$/departureDelays.delta/\u symlink_格式\u清单包含一个名为manifest的文件。如果您查看清单中的文件(例如cat清单),您将得到以下输出,指示包含最新快照的文件。文件:$/出发延迟。增量/部分-00003-…-c000。拼花地板文件:$/出发延迟。增量/部分-00006-…-c000。拼花地板文件:$/出发延迟。增量/部分-00001-…-c000。拼花地板文件:$/出发延迟。增量/部分-00000-…-c000。拼花地板文件:$/出发延迟。增量/部分-00000-…-c000。拼花地板文件:$/出发延迟。增量/部分-00001-…-c000。拼花地板文件:$/出发延迟。增量/部分-00002-…-c000。拼花地板文件:$/出发延迟。增量/部分-00007-…-c000。拼花地板创建Presto表以读取生成的清单文件下一步是在配置单元Metastore中创建一个外部表,以便Presto(或带有Glue的Athena)可以读取生成的清单文件,以确定读取哪些Parquet文件以读取Delta表的最新快照。注意,对于Presto,您可以使用apachespark或hivecli来运行以下命令。k。1创建外部表departureDelaysExternal(…)2行格式SERDE'org.apache.hadoop.配置单元.ql.io.拼花地板'三。存储为INPUTFORMAT4输出格式'org.apache.hadoop.配置单元.ql.io.HiveIgnoreKeyTextOutputFormat'5位置'$/departureDelays.delta/\u symlink_格式\u清单'关于模式实施的一些重要注意事项:第1行定义的模式必须与Delta-Lake表的模式相匹配(例如,在本例中,departureDelaysExternal)。注意,分区方案是可选的。第5行指向清单文件的位置,格式为/\u symlink_format_manifest/SymlinkTextInputFormat将Presto(或Athena)配置为从清单文件中获取拼花板数据文件的列表,而不是使用目录列表。注意,对于分区表,每个Configure Presto需要执行额外的步骤来读取生成的清单。更新清单文件需要注意的是,每次更新数据时,都需要重新生成清单文件,以便Presto能够看到最新的数据。改进的操作并发性通过下面的拉请求,您现在可以同时运行更多的Delta-Lake操作。通过更精细的粒度冲突检测,这些更新使在Delta表上运行复杂的工作流变得更加容易,例如:在旧分区上同时运行删除(例如为了符合GDPR),同时追加新的分区。同时运行压缩文件的附件。在不相交的分区集上并发运行更新和合并。并发附件用例例如,当并发事务将记录添加到同一分区时,通常会在并发合并操作期间引发ConcurrentAppendException。//目标"deltaTable"按日期和国家/地区划分可删除的("t")。合并(来源.as("s"),"s.user_id=t.user_id AND s.date=t.date AND s.country=t.country").whenMatched().updateAll().whenNotMatched().insertAll()。执行()上面的代码片段可能会导致冲突,因为条件不够明确,即使表已经按日期和国家/地区进行了分区。问题是查询当前将扫描整个表,这可能导致与更新任何其他分区的并发操作发生冲突。通过指定specificDate和specificCountry以便可以在特定日期或国家/地区合并,现在可以安全地在不同的日期和国家/地区同时运行此操作。//目标"deltaTable"按日期和国家/地区划分可删除的("t")。合并(来源.as("s"),"s.user_id=t.user_id AND d.date='"+specificDate+"'和d.country='"+specificCountry+"'").whenMatched().updateAll().whenNotMatched().insertAll()。执行()此方法对于所有其他三角洲湖操作(例如删除、元数据更改等)都是相同的。并发文件压缩如果您一直在向Delta表中写入数据,随着时间的推移,会累积大量的文件。这在流式处理场景中尤其重要,因为您是在小批量中添加数据。这会导致文件系统继续累积许多小文件;这将随着时间的推移而降低查询性能。一个重要的优化任务是周期性地获取大量的小文件并将其重写为较小数量的较大文件,即文件压缩。过去,并发查询数据和运行文件压缩时发生异常的可能性更大。但是,由于这些改进,您还可以同时运行查询(包括流式查询)和文件压缩,没有任何异常。例如,如果您的表已分区,而您只想根据谓词重新分区一个分区,则可以使用where只读取分区,并使用replaceWhere写回该分区:路径="…"partition="年份='2019'"numFilesPerPartition=16#压缩表分区到文件数(火花。阅读.格式("delta").load(路径).where(分区).重新分区(numFilesPerPartition).写入.option("dataChange","false").格式("delta").mode("覆盖").option("replaceWhere",分区).save(路径)注意,只有在没有数据更改时(例如在前面的代码段中),才使用dataChange==false选项,否则可能会损坏基础数据。使用"仅插入合并"更方便、更快地消除重复数据一个常见的ETL用例是收集日志并将它们附加到Delta-Lake表中。一个常见的问题是源代码生成重复的日志记录。使用Delta-Lake merge,可以避免插入这些重复的记录,例如下面涉及合并更新的飞行数据的代码片段。#合并合并表与航班deltaTable.alias("航班")\.merge(合并_表.别名("更新"),"航班.日期= 更新.日期") \.whenMatchedUpdate(set={"延迟":更新.延迟" } ) \。当不匹配时插入()\。执行()在Delta Lake 0.5.0之前,无法将重复数据作为数据流从Delta Lake表中读取,因为仅插入合并不是表中的纯附件。例如,在流式查询中,可以在foreachBatch中运行merge操作,以连续地将任何流式数据写入Delta-Lake表中,并删除重复数据,如下面的PySpark片段所述。从增量表进口*可删除=可删除的.forPath(spark,"/data/aggregates")#函数使用merge将microBatchOutputDF追加到Delta表中def upsertToDelta(microBatchOutputDF,batchId):deltaTable.alias("t")。合并(microBatchOutputDF.alias("s"),"s.key=t.key")\.whenMatchedUpdateAll()\。当不匹配时插入()\。执行()}#将流聚合查询的输出写入Delta表s流式聚合df.writeStream\.格式("delta")\.foreachBatch(高于delta)\.outputMode("更新")\。开始()在另一个流式查询中,您可以连续从Delta-Lake表中读取经过重复数据消除的数据。这是可能的,因为Delta Lake 0.5.0中引入的只插入合并只会将新数据追加到Delta表中。Delta Lake 0.5.0入门今天在apachespark2.4.3(或更新版本)实例上尝试前面的代码片段,试试deltalake。通过使用Delta Lake,您可以使您的数据湖更r