云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

金山云_家谱数据库_好用

小七 141 0

apachespark结构化流中的事件时间聚合与水印技术

这是关于如何使用apachespark执行复杂流分析的多部分系列文章的第四篇。连续应用通常需要对实时聚集的统计数据(如物联网设备的健康状况和读数)或检测异常行为进行接近实时的决策。在这篇博客中,我们将探讨流聚合在结构化流媒体中表达的容易程度,以及延迟和无序数据的处理方式。流式聚合结构化流允许用户表达与批处理查询相同的流式查询,sparksql引擎递增查询并对流式数据执行。例如,假设您有一个流数据帧,其中包含来自物联网设备的信号强度的事件,并且您希望计算每个设备的运行平均信号强度,那么您将编写以下Python代码:#带模式的数据帧[eventTime:timestamp,deviceId:string,signal:bigint]事件DF=。。。平均信号DF=事件df.groupBy("deviceId")。平均值("信号")如果eventsDF是静态数据上的一个DataFrame,那么这段代码也没有什么不同。然而,在这种情况下,平均值将随着新事件的到来而不断更新。您可以选择不同的输出模式将更新后的平均值写入外部系统(如文件系统和数据库)。此外,还可以使用Spark的用户定义聚合函数(UDAFs)实现自定义聚合。事件时间内Windows上的聚合在许多情况下,您不希望在整个流中运行聚合,而是希望通过时间窗口(例如,每5分钟或每小时)对数据进行聚合。在我们前面的例子中,如果设备开始出现异常行为,看看过去5分钟内的平均信号强度是多少是很有见地的。另外,这个5分钟的窗口应该基于数据中嵌入的时间戳(aka。事件时间),而不是在它被处理的时间(又名。处理时间)。早期的Spark流数据流API很难表达这样的事件时间窗口,因为API是专门为处理时间窗口(即数据到达Spark时的窗口)而设计的。在结构化流媒体中,在事件时间上表达这样的窗口只是使用window()函数执行一个特殊的分组。例如,事件中eventTime列上超过5分钟的翻滚(非重叠)窗口计数如下。从pyspark.sql.functions进口*窗口DAVGSIGNALDF=\事件DF\.groupBy(窗口("eventTime","5分钟"))\.count()在上面的查询中,每个记录将被分配给一个5分钟的滚动窗口,如下所示。每个窗口都是一个组,为其计算运行计数。也可以通过指定窗口长度和滑动间隔来定义重叠窗口。例如:从pyspark.sql.functions进口*窗口DAVGSIGNALDF=\事件DF\.groupBy(窗口("事件时间","10分钟","5分钟")\.count()在上面的查询中,每个记录将被分配到多个重叠窗口,如下所示。这种分组策略自动处理延迟和无序的数据-延迟事件只更新旧的窗口组而不是最新的窗口组。下面是一个按deviceId和重叠窗口分组的查询的端到端示例。下图显示了当您同时按deviceId和滑动窗口分组时,使用5分钟触发器处理新数据后,查询的最终结果如何更改(为简洁起见,省略了"signal"字段)。windowedCountsDF=\事件DF\.groupBy公司("设备ID",窗口("事件时间","10分钟","5分钟")\.count()请注意,最新的无序记录[12:04,dev2]是如何更新旧窗口的计数的。有状态增量执行在执行任何流式聚合查询时,sparksql引擎在内部将中间聚合维护为容错状态。这种状态被构造成键-值对,其中键是组,值是中间聚合。这些对存储在Spark执行器的内存中、版本控制的键值"state store"中,该执行器使用HDFS兼容的文件系统(在配置的检查点位置)中使用预写日志进行检查点。每次触发时,状态都会在状态存储中读取和更新,所有更新都会保存到预写日志中。在任何失败的情况下,将从检查点信息恢复状态的正确版本,并从失败的点开始查询。与可重放源和幂等汇一起,结构化流确保了对有状态流处理的一次性保证。这种容错状态管理自然会产生一些处理开销。为了将这些开销限制在可接受的范围内,状态数据的大小不应无限增长。但是,使用滑动窗口,窗口/组的数量将无限增长,状态的大小(与组的数量成比例)也会增长。为了限制州的大小,我们必须能够删除不再更新的旧聚合,例如7天以前的平均值。我们使用水印来实现这一点。处理延迟数据时限制状态的水印如前所述,延迟数据的到达会导致对旧窗口的更新。这使得定义不更新哪些旧聚合的过程复杂化,因此可以从状态存储中删除以限制状态大小。在apachespark2.1中,我们引入了水印,可以自动删除旧状态数据。水印是事件时间中的一个移动阈值,它落后于查询在已处理数据中看到的最大事件时间。尾差定义了等待延迟数据到达的时间。通过知道给定组中不再有数据到达的时间点,我们可以限制查询需要维护的状态总量。例如,假设配置的最大延迟为10分钟。这意味着延迟10分钟的事件将被允许聚合。如果观测到的最大事件时间是12:33,那么所有事件时间大于12:23的未来事件都将被视为"太迟"并被丢弃。另外,所有窗口的状态都将被清除。您可以根据应用程序的要求设置此参数-此参数的较大值允许数据稍后到达,但代价是增加状态大小,即内存使用量,反之亦然。这是前面的例子,但是有水印。windowedCountsDF=\事件DF\.withWatermark("事件时间","10分钟")\.groupBy公司("设备ID",窗口("事件时间","10分钟","5分钟")\.count()执行此查询时,Spark SQL将自动跟踪eventTime列的最大观察值,更新水印并清除旧状态。如下所示。注意在处理时间12:20和12:25之间到达的两个事件。水印用于区分延迟和"太迟"事件,并相应地处理它们。结论简而言之,我介绍了结构化流的窗口化策略来处理关键的流聚合:事件时间内的窗口以及延迟和无序的数据。使用这种加窗策略可以使结构化流引擎实现水印,其中后期数据可以被丢弃。通过这种设计,我们可以管理状态存储的大小。在即将到来的apachespark2.2版本中,我们向流数据帧/数据集添加了更高级的有状态流处理操作。请关注本系列博客以了解更多信息。如果你想了解更多关于结构化流媒体的信息,请阅读我们在本系列文章中之前的文章。apachespark中的结构化流apachespark2.1中使用结构化流的实时流ETL在ApacheSpark2.1中使用结构化流处理复杂的数据格式在apachespark2.2中用结构化流处理apachekafka中的数据要尝试ApacheSpark2.0中的结构化流,请立即尝试Databricks。免费试用Databricks。今天就开始吧