云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

云存储_网站建设哪家便宜_年度促销

小七 141 0

三角洲湖泊合并作业模式演化及运作指标

试着用这个笔记本来重现下面概述的步骤我们最近发布了deltalake0.6.0,它在表历史中引入了模式演化和合并和操作度量的性能改进。此版本的主要功能包括:支持合并操作中的模式演化(#170)–现在可以使用合并操作自动演化表的模式。这在需要将变更数据升迁到一个表中并且数据的模式会随着时间而改变的情况下非常有用。merge可以同时演化模式并向上插入更改,而不是在升迁之前检测和应用模式更改。有关详细信息,请参阅文档。通过自动重新分区(#349)提高了合并性能—当合并到分区表中时,可以选择在写入表之前按分区列自动重新分区数据。在分区表上的合并操作由于生成太多小文件(#345)而速度较慢的情况下,将启用自动重新分区(spark.delta.merge.repartitionBeforeWrite)可以提高性能。有关详细信息,请参阅文档。在没有insert子句(#342)的情况下提高性能—如果合并操作没有任何insert子句,则可以获得更好的性能。descripe HISTORY(#312)中的操作度量–现在可以在表历史记录中查看增量表上所有写入、更新和删除的操作度量(例如,更改的文件数和行数)。有关详细信息,请参阅文档。支持从任何文件系统读取增量表(#347)–现在您可以在任何具有Hadoop文件系统实现的存储系统上读取增量表。但是,写入Delta表仍然需要配置一个LogStore实现,从而为存储系统提供必要的保证。有关详细信息,请参阅文档。合并操作中的模式演化正如在Delta Lake的早期版本中所指出的,Delta Lake包括在单个原子操作中执行合并操作以简化插入/更新/删除操作的能力,以及实施和改进模式的能力(更多详细信息也可在本次技术讲座中找到)。随着Delta Lake 0.6.0的发布,现在,您可以在合并操作中发展模式。让我们用一个及时的例子来展示这一点;您可以在本笔记本中找到原始代码示例。我们将从约翰·霍普金斯CSSE的2019年新型冠状病毒COVID-19(2019 nCoV)数据仓库的一小部分开始,我们已经在/databricks数据集中提供了该数据集。这是一个研究人员和分析师通常使用的数据集,以获得对世界各地的COVID-19病例。数据的一个问题是模式会随着时间而变化。例如,代表3月1日至3月21日(截至2020年4月30日)COVID-19病例的文件具有以下模式:#导入旧的数据旧数据=(spark.read.option("inferSchema",True).option("header",True)。。。.csv(/databricks datasets/COVID/../03-21-2020.csv))旧的_数据.printSchema()根|--省/州:字符串(可为null=true)|--国家/地区:字符串(可为null=true)|--上次更新:时间戳(可空=true)|--已确认:整数(可空=真)|--死亡:整数(可空=真)|--已恢复:整数(可为null=true)|--纬度:双精度(可空=真)|--经度:双精度(可空=真)但是从3月22日起(截止到4月30日)的文件有额外的列,包括FIPS、Admin2、Active和Combined\u Key。新数据=(spark.read.option("inferSchema",True).option("header",True)。。。.csv(/databricks datasets/COVID/../04-21-2020.csv))新的_数据.printSchema()根|--FIPS:整数(可为空=真)|--Admin2:字符串(null=true)|--省/市/自治区:字符串(可为null=true)|--国家/地区:字符串(可为null=true)|--上次更新:字符串(可为null=true)|--Lat:double(可空=真)|--Long_:double(可空=真)|--已确认:整数(可空=真)|--死亡:整数(可空=真)|--已恢复:整数(可为null=true)|--活动:整数(可空=真)|--组合键:字符串(可为null=true)在我们的示例代码中,我们重命名了一些列(例如Long_->Longitude、Province/State->Province_-State等),因为它们在语义上是相同的。如果关键问题只是将模式合并在一起,我们可以使用Delta Lake的模式演化特性,使用中的"mergeSchema"选项数据帧.写入(),如下面的语句所示。新的_data.write.option.选项("mergeSchema","true").mode("append").save(路径)但是,如果需要更新现有值并同时合并模式,会发生什么情况呢?在deltalake0.6.0中,这可以通过合并操作的模式演化来实现。为了直观地显示这一点,让我们从回顾一行的旧数据开始。旧的_数据.选择("处理日期","省/自治区","国家/地区","上次更新","确认")。show()+------------+--------------+--------------+-------------------+---------+|流程日期|省|州|国家|地区|最后一次更新|确认|+------------+--------------+--------------+-------------------+---------+|2020-03-21 |华盛顿|美国| 2020-03-21 22:43:04 | 1793|+------------+--------------+--------------+-------------------+---------+接下来,让我们模拟一个遵循新的数据模式的更新条目#模拟更新的条目项目=[(53,,'华盛顿','美国','2020-04-27T19:00:00',47.4009,-121.4905,1793,94,0,','',2020-03-21',2)]cols=['FIPS','Admin2','Province_State','Country_Region','Last_Update','Latitude','Longitude','Confirmed','deadised','Recovered','Active','Combined_Key','process_date','level']模拟的更新=spark.createDataFrame(项,列)union模拟了总共40行的"更新"和"新"数据。新的_数据.选择("处理日期","FIPS","省/自治区","国家/地区","上次更新","确认")。排序(col("FIPS"))。显示(5)+------------+-----+--------------+--------------+-------------------+---------+|流程| FIPS |省|州|国家|地区|最后一次更新|确认|+------------+-----+--------------+--------------+-------------------+---------+|2020-03-21 | 53 |华盛顿|美国| 2020-04-27T19:00:00 | 1793||2020-04-11 | 53001 |华盛顿|美国| 2020-04-11 22:45:33 | 30||2020-04-11 | 53003 |华盛顿|美国| 2020-04-11 22:45:33 | 4||2020-04-11 | 53005 |华盛顿|美国| 2020-04-11 22:45:33 | 244||2020-04-11 | 53007 |华盛顿|美国| 2020-04-11 22:45:33 | 53|+------------+-----+--------------+--------------+-------------------+---------+我们设置以下参数以配置环境以进行自动模式演化:#启用自动模式演化火花.sql("设置spark.databricks.delta.schema.autoMerge.enabled=正确")现在我们可以运行一个原子操作来更新值(从2020年3月21日开始),并用下面的语句合并新模式。从增量表进口*可删除=可删除的.forPath(火花,三角形路径)#使用合并操作的模式演化deltaTable.alias("t")。合并(新的_数据.别名("s"),"s.process_date=t.process_date AND s.province_state=t.province_state AND s.country_region=t.country_region AND s.level=t.level")。当MatchedUpdateAll时().不匹配时插入()。执行()让我们用以下语句回顾一下Delta Lake表:#加载数据spark.read.format("delta")。加载(delta_路径).select("处理日期","FIPS","省/自治区","国家/地区","上次更新","确认","管理员2").sort(col("FIPS"))。显示()+------------+-----+--------------+--------------+-------------------+---------+|流程| FIPS |省|州|国家|地区|上次更新|确认|管理员|+------------+-----+--------------+--------------+-------------------+---------+-----+|2020-03-21 | 53 |华盛顿|美国| 2020-04-27T19:00:00 | 1793 |||2020-04-11 | 53001 |华盛顿|美国| 2020-04-11 22:45:33 | 30 |亚当斯||2020-04-11 | 53003 |华盛顿|美国| 2020-04-11 22:45:33 | 4 |阿索丁||2020-04-11 | 53005 |华盛顿|美国| 2020-04-11 22:45:33 | 244 |本顿||2020-04-11 | 53007 |华盛顿|美国| 2020-04-11 22:45:33 | 53 |切兰|+------------+-----+--------------+--------------+-------------------+---------+-----+运营指标通过运行以下语句,您可以通过查看Spark UI中的Delta Lake表历史记录(operationMetrics列),进一步深入了解操作指标:历史记录()。显示()下面是前面命令的简短输出。+-------+------+---------+--------------------+|版本|用户ID |操作|操作度量|+-------+------+---------+--------------------+|1 | 100802 |合并|[numTargetRowsCop||0 | 100802 |写入|[numFiles->1,n|+-------+------+---------+--------------------+您将注意到表的两个版本,一个用于旧架构,另一个版本用于新架构。{"numTargetRowsCopied":"0","numTargetRowsDeleted":"0"{"numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetFilesAdded":"3","numTargetRowsInserted":"39","numTargetRowSupdate":"1","numutputrows":"40","numSourceRows":"40","numTargetFilesRemoved":"1"}"numTargetFilesAdded":"3","numTargetRowsInserted":"39","numTargetRowSupdate":"1","numutputrows":"40","numSourceRows":"40","numTargetFilesRemoved":"1"}通过转到Spark UI中的SQL选项卡,您可以了解有关这些操作度量背后的详细信息。动画GIF调用Spark UI的主要组件供您查看。一个文件的39个初始行(fo