云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

数据库服务器_下载百度云盘_试用

小七 141 0

使用AWS-DMS将事务数据迁移到Delta-Lake

在数据库里试试这个笔记本注意:我们还建议您使用Databricks Delta将高效的Upserts读入数据湖,这解释了使用MERGE命令执行高效的Upserts和delete。将数据从数据库移动到数据池的挑战大型企业正在将事务性数据从分散在异构位置的数据集市移动到集中的数据池中。业务数据越来越多地被整合到一个数据湖中,以消除筒仓、获得洞察力并构建人工智能数据产品。然而,从各种不断变化的事务性数据库中构建数据湖并使其保持最新状态是极其复杂的,并且可能是一个操作上的噩梦。使用特定于供应商的CDC工具或Apache SparkTM direct JDBC摄取的传统解决方案在以下典型的客户场景中不实用:(a) 数据源通常分布在prem服务器和云端,其中包括来自PostgreSQL、Oracle和MySQL数据库的数十个数据源和数千个表(b) 在数据湖中捕获的更改数据的业务SLA在15分钟内(c) 数据的所有权和数据库连接的网络拓扑结构各不相同。在上述场景中,使用Delta-lake和AWS数据库迁移服务(DMS)构建一个数据湖来迁移历史和实时事务数据被证明是一个很好的解决方案。这篇博客文章介绍了一个使用AWS数据库迁移服务(AWS-DMS)和Delta-Lake构建可靠数据湖的另一个简单过程,从多个RDBMS数据源获取数据。然后,您可以使用Databricks统一分析平台对实时和历史数据进行高级分析。什么是三角洲湖?DeltaLake是一个开源存储层,它为数据湖带来了可靠性。Delta Lake提供ACID事务、可扩展的元数据处理,并将流式处理和批处理数据处理统一起来。DeltaLake运行在现有的数据湖之上,与ApacheSparkAPI完全兼容。具体而言,三角洲湖提供:Spark上的ACID事务:可序列化的隔离级别确保读者永远不会看到不一致的数据。可伸缩的元数据处理:利用Spark的分布式处理能力处理PB级表的所有元数据,并轻松处理数十亿个文件。流和批处理统一:Delta Lake中的表既是批处理表,也是流源和汇。流数据接收、批处理历史回填、交互式查询都是开箱即用的。架构强制:自动处理架构变体,以防止在摄取期间插入错误记录。时间旅行:数据版本控制支持回滚、完整的历史审计跟踪和可重复的机器学习实验。在Databricks上使用托管的Delta-Lake进行upsert(也很快会出现在开源Delta-Lake):MERGE命令允许您高效地向上插入和删除数据湖中的记录。MERGE极大地简化了许多公共数据管道的构建方式;现在可以用简单的合并查询来代替所有那些低效重写整个分区的复杂多跳进程。这种更细粒度的更新功能简化了构建大数据管道以从AWS-DMS-changelogs捕获更改数据的方式。什么是AWS数据库迁移服务(DMS)?AWS-DMS可以将您的数据从最广泛使用的商业和开源数据库迁移到S3,以便迁移现有数据和不断变化的数据。该服务支持从不同的数据库平台进行迁移,例如从Oracle到Amazon Aurora或从Microsoft SQL Server迁移到MySQL。使用AWS数据库迁移服务,您可以连续地以高可用性复制数据,并通过将数据从任何受支持的源传输到amazons3来整合数据库。使用AWS数据库迁移服务将数据迁移到三角洲湖假设您在MySQL数据库上构建了一个"person"表,该表保存应用程序用户记录的数据,其中显示了列。每当人员移动、添加新人员和删除现有人员时,表都会更新。我们将使用AWS-DMS将这个表摄取到S3中,然后使用Delta-Lake加载它,以展示一个摄取和保持数据湖与事务性数据存储同步的示例。我们将在MySQL中演示对这个表的更改数据捕获,并使用AWS-DMS将更改复制到S3中,并轻松地合并到使用Delta-lake构建的数据湖中。建筑在这个解决方案中,我们将使用DMS将数据源引入amazons3,以便进行初始接收和连续更新。我们将来自S3的初始数据加载到Delta-Lake表中,然后使用Delta-Lake的upserts功能将更改捕获到Delta-Lake表中。我们将在Delta Lake表上运行与原始来源同步的分析,以获得业务洞察力。下图展示了建议的解决方案:在Delta Lake上获得数据后,您可以轻松地使用仪表板或BI工具生成智能报告,以获得见解。您还可以进一步使用这些数据来构建带有数据块的ML模型。解决方案详细信息在本文中,我们使用MySQL引擎创建RDS数据库,然后加载一些数据。在现实生活中,可能不止一个源数据库;本文描述的过程仍然类似。在Amazon上创建一个源数据库,并遵循以下步骤创建数据库。使用主教程页面中的链接查看如何连接到特定数据库和加载数据。有关更多信息,请参阅:创建运行MySQL数据库引擎的DB实例记下您创建的安全组,并将所有RDS实例与其关联。称之为"TestRDSSecurityGroup"。之后,您应该能够看到在RDS实例仪表板中列出的数据库。设置目标S3桶设置两个S3存储桶,如下所示,一个用于批量初始加载,另一个用于增量更改数据捕获。在下一步中,选择public Accessible for non-production usage以保持配置的简单性。另外,为了简单起见,请选择放置RDS实例的同一个VPC,并将TestRDSSecurityGroup包含在允许访问的安全组列表中。设置DMS您可以轻松地设置DMS,如AWS数据库迁移服务博客文章所示。您可以采取以下循序渐进的方法:创建复制实例。为在上一步中设置的源数据库和目标S3存储桶创建端点。创建一个任务,将每个源同步到目标。创建终结点在DMS控制台中,选择Endpoints,Create endpoint。需要配置代表MySQL RDS数据库的端点。您还需要通过提供前面步骤中创建的S3存储桶来创建目标端点。配置后,端点看起来类似于以下屏幕截图:创建两个任务并开始数据迁移您可以依赖于Amazon s表中的存储桶迁移到DMS表中在DMS控制台中,选择Tasks,Create Tasks。填写如下屏幕截图所示的字段:初始加载的迁移任务:CDC的迁移任务:注意,如果源是RDS MySQL,并且您选择迁移数据和复制正在进行的更改,则需要启用bin log retention。其他引擎有其他要求,DMS会相应地提示您。对于这种特殊情况,请运行以下命令:呼叫mysql.rds_set_配置("binlog保留小时数",24);两个任务都成功完成后,"任务"选项卡现在如下所示:确保数据迁移正常工作:检查初始数据是否加载到S3存储桶:示例行:2.对源数据库中的person表进行一些更改,并注意这些更改已迁移到S3在person(id、名字、姓氏、电子邮件、性别、出生日期、地址、城市、州)中插入值('1001','Arun','Pamulapati','cadhamsrs@umich.edu'、'女'、'1959-05-03'、'4604特拉华交界处'、'Gastonia'、'NC');UPDATE person set state='MD',其中id=1000;从id=998的人员中删除;更新person set state='CA',其中id=1000;更改日志:将初始迁移数据加载到三角洲湖我们将从初始加载文件创建Delta Lake表,您可以使用sparksql代码,将格式从parquet、csv、json等更改为Delta。对于所有文件类型,您将文件读入数据帧并以增量格式写入:人流量=spark.read.option("Header",True).option("InferSchema",True).csv("/mnt/%s/arun/person/"%InitialLoadMountName)personDF.write.format("delta")。保存("/delta/person")火花.sql("使用DELTA位置'/DELTA/person/'创建表person")将增量数据合并到三角洲湖我们将使用Delta merge into功能将更改日志捕获到Delta湖中。人员变更DF=(spark.read.csv("person/untsmofs,untsmofs/untsmofs=True/untsmount/schema=TrueignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True)personChangesDF.registerEmptable("人员变更")合并到个人目标使用(选择操作,最新_更改.id,名字,姓氏,电子邮件,性别,出生日期,地址,城市,州,创建日期,上次更新发件人更改最新更改内部联接(选择id,max(上次更新)作为MaxDate从人的变化按id分组)cm最新_更改.id= 厘米id最新的_更改。上次更新= 厘米最大日期)作为来源打开源代码.id== 目标.id当匹配和来源.Op='D'然后删除匹配后更新集*如果不匹配,则插入*注:1) 您可以使用Databricks作业功能根据sla计划CDC合并,并在成功合并后将changelogs从cdcs3 bucket移动到归档bucket,以使合并负载保持在最新的较小范围内。Databricks平台中的作业是一种立即或按计划运行笔记本或JAR的方法。您可以创建和运行作业