云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

百度云_服务器硬件检测_哪家好

小七 141 0

基于Spark流和Delta-Lake的流数据质量监测

试着用这个笔记本来重现下面概述的步骤在一切都在加速的时代,流数据不再是一个离群值,而是成为一种常态。我们经常听不到客户问:"我可以流式传输这些数据吗?"甚至"我能以多快的速度传输这些数据?"卡夫卡和德尔塔湖等技术的普及也突显了这一势头。在这个流频谱的一端是我们认为的"传统"流式工作负载—以高速到达的数据,通常是半结构化或非结构化格式(如JSON),并且通常是小负载。这种类型的工作负载跨越垂直行业;一个这样的客户示例是一个主要的证券交易所和数据提供商,负责每分钟传输几十万个事件—股票交易记录、新闻、报价和其他财务数据。该客户使用Databricks、Delta和结构化流媒体以高可用性实时处理和分析这些流。然而,随着规律性的提高,我们看到另一端的客户使用流媒体进行低频"批处理"处理。在这种体系结构中,流作为一种监视特定目录、S3存储桶或其他着陆区域的方式,并在数据落地后立即自动处理数据—这样的体系结构消除了传统调度的大部分负担,特别是在作业失败或部分处理的情况下。所有这些都是为了说明:流媒体不再仅仅用于计算边缘的实时或接近实时的数据。虽然流媒体在主流媒体中的出现是一个积极的方面,但是这种架构也带来了一些包袱。特别是,历史上存在一种权衡:高质量的数据,还是高速的数据?事实上,这不是一个有效的问题;质量必须与速度相结合,所有实际手段都必须如此——要实现高速度,我们需要高质量的数据。毕竟,高速度下的低质量将需要再加工,通常是成批的;另一方面,低速高质量不能满足许多现代问题的需要。随着越来越多的公司采用流媒体作为其处理架构的关键,速度和质量都必须提高。在这篇博文中,我们将深入研究一种数据管理架构,该架构可用于在数据到达时主动监控和分析数据流中的损坏或不良数据,而不会造成瓶颈。构建流数据分析和监视过程在Databricks,我们看到许多模式出现在我们的客户身上,因为他们把可能的东西推到了极限,速度/质量问题也没有什么不同。为了帮助解决这个矛盾,我们开始考虑正确的工具,不仅提供所需的数据速度,而且还提供可接受的数据质量水平。结构化流媒体和Delta-Lake自然适合于摄取层和存储层,因为它们共同创建了一个可伸缩、容错和近乎实时的系统,并保证了一次交付。为企业数据质量分析找到一个可接受的工具有些困难。特别是,此工具需要能够执行数据质量度量的有状态聚合;否则,在整个数据集上执行检查(例如"具有非空值的记录的百分比")将随着接收数据量的增加而增加计算成本。对于任何流媒体系统来说,这都是一个不起眼的问题,它可以省去很多工具。在最初的解决方案中,我们选择了来自Amazon的数据质量工具Deequ,因为它提供了一个简单而强大的API、有状态地聚合数据质量度量的能力以及对Scala的支持。未来,其他Spark原生工具,如即将推出的Delta Expections和pipelines,将提供替代方案。实现流数据质量监控我们通过在一个EC2实例上运行一个小型Kafka生产者来模拟数据流,该实例将模拟的交易股票信息输入主题,并使用本机Databricks连接器将这些数据引入Delta-Lake表中。为了展示Spark Streaming中数据质量检查的功能,我们选择在整个管道中使用Deequ的不同功能:根据历史摄取数据生成约束建议使用foreachBatch对到达的数据运行增量质量分析使用foreachBatch对到达的数据运行一个(小)单元测试,并将坏批隔离到bad records表中将每个到达批次的最新度量状态写入delta表中对整个数据集执行定期(较大)单元测试,并在MLFlow中跟踪结果根据验证结果发送通知(即通过电子邮件或Slack)在MLFlow中捕获度量以进行可视化和日志记录我们合并了MLFlow来跟踪数据性能指标随时间和Delta表版本的质量,以及用于通知和警报的Slack连接器。下图显示了这个管道。由于Spark中的统一批处理/流式处理接口,我们能够在该管道中的任何点(作为实时更新或批处理快照)获取报告、警报和指标。这对于设置触发器或限制特别有用,这样,如果某个度量超过某个阈值,就可以执行数据质量改进操作。另外需要注意的是,我们不会影响原始数据的初始登陆;这些数据会立即提交到Delta表中,这意味着我们不会限制我们的摄取率。下游系统可以直接从该表中读取数据,如果超过上述任何一个触发器或质量阈值,则可能会中断;或者,我们可以轻松地创建一个排除不良记录的视图,以提供一个干净的表。在较高的层次上,执行数据质量跟踪和验证的代码如下所示:spark.readStream.table("trades_delta").writeStream公司.foreachBatch{(batchDF:DataFrame,batchId:Long)=>//将当前状态重新分配到上一个下一个状态val stateStoreCurr=状态存储下一个//对当前批处理运行分析,使用已保存状态聚合val度量结果=AnalysisRunner.run(数据=batchDF,…)//验证我们当前的微观察的有效性val verificationResult=验证套件().onData(批处理df).addCheck(…).run()//如果验证失败,则将批写入不良记录表如果(验证结果.状态!= 检查状态。成功) {...}//将当前结果写入度量表公制_结果.写入.格式("delta").mode("覆盖").saveAsTable("deequ_指标")}。开始()使用数据质量工具Deequ在Databricks中使用Deequ是相对自然的—首先定义一个分析器,然后在数据帧上运行该分析器。例如,我们可以跟踪Deequ本机提供的几个相关指标,包括检查数量和价格是否非负数、原始IP地址是否为空以及所有事务中符号字段的清晰度。Deequ的StateProvider对象在流设置中特别有用;这些对象允许用户在内存或磁盘上保存度量的状态,并在以后聚合这些度量。这意味着处理的每个批处理只分析该批的数据记录,而不是整个表。这使性能保持相对稳定,即使数据大小在增长,这对于需要在任意大量数据上一致运行的长期运行的生产用例非常重要。MLFlow还可以很好地跟踪度量随时间的演变;在我们的笔记本中,我们跟踪foreachBatch代码中分析的所有Deequ约束作为度量,并使用Delta versionID和时间戳作为参数。在Databricks笔记本电脑中,集成的MLFlow服务器对于度量跟踪特别方便。通过使用结构化流媒体、Delta-Lake和Deequ,我们能够消除质量和速度之间的传统权衡,而是专注于实现两者的可接受水平。这里特别重要的是灵活性——不仅在如何处理不良记录(隔离、错误、消息等)方面,而且在架构上(何时何地执行检查?)生态系统(我如何使用我的数据?)。开源技术,如Delta、结构化流和Deequ是这种灵活性的关键——随着技术的发展,能够使用最新和最好的解决方案成为竞争优势的驱动力。最重要的是,数据的速度和质量不能相互对立,而是要保持一致,特别是当流媒体越来越接近核心业务运营时。很快,这将不再是一个选择,而是一种期望和要求,我们正朝着这个世界迈进,一次一个微博客。免费试用Databricks。今天就开始吧