云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

百度云_中国代理服务器_学生机

小七 141 0

在apachespark2.3中引入流连接

自从我们在apachespark2.0中引入了结构化流,它支持流和静态数据帧/数据集之间的连接(内部连接和某种类型的外部连接)。随着apachespark 2.3.0的发布,作为Databricks统一分析平台的一部分,现在我们支持流连接。在这篇文章中,我们将探讨如何使用流流连接、我们解决了哪些挑战以及它们支持的工作负载类型。让我们从流连接的规范用例开始——广告货币化。流连接的案例:广告货币化假设您有两个流-一个是广告印象流(即,当一个广告显示给用户时)和另一个广告点击流(即,当显示的广告被用户点击时)。为了使广告赚钱,你必须匹配哪个广告印象导致点击。换句话说,您需要基于一个公共密钥来连接这些流,即在两个流的事件中出现的每个广告的唯一标识符。从高层来看,问题如下。虽然这在概念上是一个简单的想法,但仍有一些核心技术挑战需要克服。使用缓冲处理延迟/延迟的数据:impression事件及其对应的单击事件可能无序到达,它们之间存在任意延迟。因此,流处理引擎必须适当地缓冲这些延迟,直到它们匹配为止。即使所有连接(静态连接或流连接)都可能使用缓冲区,但真正的挑战是避免缓冲区无限增长。限制缓冲区大小:限制流连接缓冲区大小的唯一方法是将延迟的数据丢弃到某个阈值之外。用户应该根据业务需求和系统资源限制之间的平衡来配置这个最大延迟阈值。定义良好的语义:在静态连接和流连接之间保持一致的SQL连接语义,无论是否有上述阈值。我们已经在我们的流-流连接中解决了所有这些挑战。因此,您可以使用SQL连接的明确语义来表示计算,并控制相关事件之间的延迟。让我们看看怎么做。首先让我们假设这些流是两个不同的卡夫卡主题。流数据帧的定义如下:impressions=(#schema-adId:String,impressionTime:Timestamp。。。火花.读流.format("卡夫卡").option("订阅"、"印象")….加载())clicks=(#schema-adId:String,clickTime:Timestamp。。。火花.读流.format("卡夫卡").option("订阅","点击")….加载())然后你需要做的所有内部equi连接他们如下。印象。加入(点击"adId")\adId在两个数据帧中都很常见与所有结构化流式查询一样,如果DataFrames表示和单击是在静态数据上定义的,那么这段代码与您编写的代码完全相同。当执行此查询时,结构化流引擎将根据需要缓冲单击和印象作为流状态。对于特定的广告,一旦接收到两个相关事件(即,一旦接收到第二个事件),就会生成联合输出。当数据到达时,连接的输出将逐渐生成并写入查询接收器(例如另一个Kafka主题)。最后,如果联接查询应用于两个静态数据集(也就是说,与SQL联接具有相同的语义),那么联接的累积结果也不会有什么不同。事实上,即使一个是流,另一个是静态数据集,它也是一样的。但是,在这个查询中,我们没有给出任何关于引擎应该为事件缓冲多长时间以找到匹配项的指示。因此,引擎可以永远缓冲事件并累积无限量的流状态。让我们看看如何在查询中提供附加信息来限制状态。管理流连接的流状态要限制流连接维护的流状态,您需要了解有关您的用例的以下信息:这两个事件在各自源头产生的时间间隔是多少?在我们的用例上下文中,让我们假设一次点击可以在对应的印象后0秒到1小时内发生。事件在源和处理引擎之间的传输过程中可以延迟的最长持续时间是多少?例如,来自浏览器的广告点击可能会因为断断续续的连接而延迟,并且到达的时间比预期的要晚得多,并且出现故障。比如说,印象和点击最多可以延迟2小时和3小时。通过对每个事件的这些时间限制,处理引擎可以自动计算需要缓冲多长时间的事件才能生成正确的结果。例如,它将评估以下内容。印象最多需要缓冲4小时(在活动时间内),因为延迟3小时的点击可能与4小时前的印象相匹配(即,延迟3小时+印象和点击之间的延迟时间最多为1小时)。相反,点击最多需要缓冲2小时(在事件时间内),因为延迟2小时的印象可能与2小时前收到的点击相匹配。因此,当引擎确定任何缓冲事件在将来不会得到任何匹配时,它可以从流中删除旧的印象和单击。在高层,这个动画演示了水印是如何随着事件时间更新的,以及状态是如何清除的。这些时间约束可以在查询中编码为水印和时间范围连接条件。水印:结构化流中的水印是通过指定要考虑多少延迟数据来限制所有状态流操作中的状态的方法。具体地说,水印是事件时间中的一个移动阈值,它落后于查询在处理数据中看到的最大事件时间。使用水印引擎定义延迟到达时间(aka)和延迟。在我们之前的关于流式聚合的博客文章中可以更详细地了解它。对于流流内部连接,可以选择指定水印延迟,但必须指定限制两个流上的所有状态。时间范围条件:这是一个连接条件,它限制每个事件可以针对的其他事件的时间范围。可以通过以下两种方式之一指定:时间范围连接条件(例如…在rightTime和rightTime之间的leftTime连接+间隔1小时),在事件时间窗口中加入(例如…在leftTimeWindow=rightTimeWindow上加入)。总之,我们的广告盈利的内部连接将如下所示。从pyspark.sql.functions进口出口#定义水印impressionsWithWatermark=印象\.selectExpr("adId AS impressionAdId","印象时间")\.withWatermark("印象时间","10秒")\clicksWithWatermark=点击\。选择表达式("adId AS clickAdId","clickTime")\.withWatermark("clickTime","20秒")#最多延迟20秒#带时间范围条件的内部联接改进sessionswithwatermark.join(单击swithwatermark,表达式(""clickAdId=impressionAdId和单击时间>=印象时间,然后单击时间