云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

域名解析_佛山网站建设报价_企业0元试用

小七 141 0

使用Databricks Delta简化流式股票数据分析

传统上,股票数据的实时分析是一项复杂的工作,因为维护流式系统和同时确保遗留数据和流式数据事务一致性的复杂性。Databricks Delta有助于解决构建实时分析股票数据的流式系统的许多难点。在下图中,我们提供了一个高级体系结构来简化这个问题。我们首先将两组不同的数据摄取到两个Databricks Delta表中。这两个数据集是股票价格和基本面。在将数据摄取到它们各自的表中之后,我们将数据连接到ETL过程中,并将数据写出到第三个Databricks Delta表中,以便进行下游分析。在这篇博文中,我们将回顾:当前运行这样一个系统的问题Databricks Delta如何解决这些问题如何在Databricks中实现该系统Databricks Delta通过将Apache Spark的可伸缩性、流式传输和高级分析访问与数据仓库的性能和ACID遵从性相结合,帮助解决这些问题。Databricks Delta之前的传统痛点传统的流和数据仓库解决方案的难点可以分为两类:数据湖和数据仓库。数据湖痛点虽然data lakes允许您在文件系统中灵活地存储大量数据,但有许多难点包括(但不限于):整合来自许多不同系统的流数据是困难的。在数据湖中更新数据几乎是不可能的,而且许多流数据需要随着更改而更新。这在涉及财务对账和后续调整的情况下尤其重要。数据湖的查询速度通常非常慢。优化存储和文件大小非常困难,通常需要复杂的逻辑。数据仓库难点数据仓库的强大之处在于您拥有数据的持久性能存储。但构建现代连续应用程序的难点包括(但不限于):仅限于SQL查询;即没有机器学习或高级分析。如果可能的话,同时访问流数据和存储的数据是非常困难的。数据仓库的规模不是很好。将计算和存储捆绑在一起使得使用仓库非常昂贵。Databricks Delta如何解决这些问题Databricks Delta(Databricks Delta Guide)是一个统一的数据管理系统,它为云数据湖带来了数据可靠性和性能优化。更简洁地说,Databricks Delta利用了datalakes和datawarehouses的优势,以及apachespark,让您可以做不可思议的事情!Databricks Delta与结构化流一起,使得以数据仓库的速度一起分析流数据和历史数据成为可能。使用Databricks Delta表作为流式大数据的源和目的地,可以很容易地整合不同的数据源。Databricks Delta表支持upsert。您的流媒体/数据湖/仓储解决方案符合ACID。轻松地将机器学习评分和高级分析纳入ETL和查询中。为完全可扩展的解决方案分离计算和存储。使用Databricks Delta实现流式股票分析解决方案Databricks Delta和apachespark为我们的解决方案做了大部分工作;您可以尝试完整的笔记本电脑,并遵循下面的代码示例。让我们从启用Databricks Delta开始;在撰写本文时,Databricks Delta处于私有预览中,因此请在https://databricks.com/product/databricks-delta。如上图所示,我们需要处理两个数据集—一个用于基本面,另一个用于价格数据。为了创建两个Databricks Delta表,我们针对DBFS位置指定.format("Delta")。#创建基本数据(Databricks Delta table)dfBaseFund=火花\\.阅读\\.format('delta')\\.load('/delta/stocksFundamentals')#创建价格数据(Databricks Delta table)dfBasePrice=火花\\.阅读\\.format('delta')\\.load('/delta/stocksDailyPrices')在更新stockFundamentals和stocksDailyPrices时,我们将通过一系列ETL作业将这些数据合并到一个合并视图中(stocksDailyPricesWFund)。通过下面的代码片段,我们可以确定可用数据的开始和结束日期,然后将该日期范围内的价格和基本面数据合并到DBFS中。#确定可用数据的开始和结束日期行=dfBasePrice.agg(函数最大值(dfBasePrice.price_日期).alias("maxDate"),功能最小值(dfBasePrice.price_日期).alias("minDate")).collect()[0]startDate=rowminDate"]结束日期=rowmaxDate"]#定义日期范围函数定义日期范围(开始日期、结束日期):对于范围内的n(int((结束日期-开始日期).days)):收益起始日期+日期时间。时间增量(n)#按日期和def组合许可证和基金(日期):DF基金=dfBaseFund.where(dfBaseFund.price_日期==日期)dfPrice=dfBasePrice。何处(dfBasePrice.price_日期==日期).drop('价格日期')#删除更新的列dfPriceWFund=dfPrice.join(dfFund,['ticker']).drop('已更新')#将数据保存到DBFSdfPriceWFund公司.写入.format('delta').mode('append').save('/delta/stocksDailyPricesWFund')#循环日期,完成基本面+价格ETL流程对于daterange中的单个日期(开始日期,(结束日期+日期时间。时间增量(天=1)):打印"开始"+单个_日期.时间("%Y-%m-%d")开始=日期时间。日期时间。现在()组合许可证和基金(单一日期)结束=日期时间。日期时间。现在()打印(结束-开始)现在,我们有了一系列整合的基本面和价格数据,这些数据被推送到/delta/stocksDailyPricesWFund位置的DBFS中。我们可以通过在DBFS位置指定.format("Delta")来构建Databricks Delta表。dfPriceWithFundamentals=火花.读流.格式("delta").load("/delta/stocksDailyPricesWFund")//创建数据的临时视图dfP公司riceWithFundamentals.createOrReplaceTempView("基本面价格")现在我们已经创建了最初的Databricks Delta表,让我们创建一个视图,它允许我们实时计算市盈率(因为底层流数据更新了Databricks Delta表)。%sql语言将临时视图viewPE创建或替换为选择ticker,价格日期,首先(结束)作为价格,(关闭/eps_basic_net)作为pe从价格和基本面其中eps_basic_net>0按股票代码分组,价格日期,pe实时分析流式股票数据有了视图之后,我们可以使用sparksql快速分析数据。%sql语言选择*从viewPE其中ticker=="AAPL"按价格订购由于此合并数据集的底层源是一个Databricks Delta表,因此该视图不仅显示批处理数据,而且还显示根据以下流式仪表板传入的任何新数据流。在幕后,结构化流不仅仅是将数据写入Databricks Delta表,还保留需要跟踪的不同数量的键(在本例中是股票代码)的状态。因为您使用的是sparksql,所以可以大规模实时地执行聚合查询。%sql语言选择ticker,AVG(close)作为Average_close从价格和基本面按股票代码分组按平均值订购摘要最后,我们演示了如何使用Databricks Delta简化流式股票数据分析。通过将Spark结构化流和Databricks Delta相结合,我们可以使用Databricks集成工作区来创建一个性能优异、可扩展的解决方案,它兼具数据湖和数据仓库的优点。Databricks Unified Analytics平台消除了通常与流式和事务一致性相关的数据工程复杂性,使数据工程和数据科学团队能够专注于了解其股票数据的趋势。 对开源的三角洲湖感兴趣吗?访问Delta Lake在线中心了解更多信息,下载最新代码并加入Delta Lake社区。 免费试用Databricks。今天就开始吧