云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

数据库服务器_阿里云frp_企业级

小七 141 0

将apachespark的结构化流媒体投入生产

这是关于如何使用apachespark执行复杂流分析的多部分系列文章的第五篇。在Databricks,我们已经在过去几个月将我们的生产管道迁移到结构化流媒体上,并希望共享我们的现成部署模型,以允许我们的客户在Databricks中快速构建生产管道。生产应用程序需要监视、警报和故障恢复的自动(云本机)方法。这篇文章不仅将引导您了解可用于解决这些挑战的api,还将向您展示Databricks如何使在生产中运行结构化流媒体变得简单。指标和监控apachespark中的结构化流提供了一个简单的编程API来获取当前正在执行的流的信息。为了获取有关正在进行的查询执行的相关信息,可以在当前活动流上运行两个关键命令:获取查询当前状态的命令和获取查询最近进度的命令。状态您可能会问的第一个问题是,"我的流现在正在执行什么处理?"状态维护有关流当前状态的信息,并且可以通过启动查询时返回的对象进行访问。例如,您可能有一个简单的计数流,它提供由以下查询定义的IOT设备的计数。query=streamingCountsDF\.writeStream公司\.format("内存")\.queryName("计数")\.outputMode("完成")\。开始()跑步查询状态将返回流的当前状态。这为我们提供了流中那个时间点发生的事情的详细信息。{"message":"从FileStreamSource获取偏移量[dbfs:/databricks datasets/structured streaming/events]","isDataAvailable":真,"isTriggerActive":正确}Databricks笔记本为您提供了一种查看任何流式查询状态的简单方法。只需将鼠标悬停在流式查询中可用的图标上。您将获得相同的信息,从而使您更方便地快速了解流的状态。近期进展虽然查询状态当然很重要,但同样重要的是能够查看查询的历史进程。Progress元数据将允许我们回答诸如"我处理元组的速率是多少?"或者"元组从源到达的速度有多快?"通过跑步stream.recentProgress您将获得更多基于时间的信息,如处理速度和批处理持续时间。然而,一张图片胜过一千个JSON blob,所以在Databricks,我们创建了可视化,以便快速分析流的最新进展。让我们来探讨一下为什么我们选择显示这些指标,以及它们对您理解的重要性。输入速率和处理速率输入速率指定从Kafka或Kinesis这样的系统流入结构化流的数据量。处理速度就是我们分析数据的速度。在理想的情况下,它们应该一致地一起变化;但是,它们将根据处理开始时存在的输入数据量而变化。如果输入速率远远超过处理速率,我们的流就会落后,我们必须将集群扩展到更大的大小来处理更大的负载。批处理持续时间几乎所有流媒体系统都使用批处理来以任何合理的吞吐量运行(有些系统可以选择高延迟来换取较低的吞吐量)。结构化流媒体实现了两者。当它对数据进行操作时,您可能会看到这种振荡是结构化流式处理随时间变化的事件数。在CommunityEdition上的这个单核集群上,我们可以看到我们的批处理持续时间在3秒左右持续振荡。较大的集群自然会有更快的处理速度以及更短的批处理持续时间。流式作业的生产警报度量和监控都很好,但是为了快速响应出现的任何问题,而不必整天照看流媒体作业,您需要一个强大的警报故事。Databricks允许您将流作业作为生产管道运行,从而使警报变得简单。例如,让我们用以下规范定义Databricks作业:请注意我们是如何设置电子邮件地址来触发PagerDuty中的警报的。这将在作业失败时触发产品警报(或您指定的级别)。自动故障恢复虽然警报是方便的,但不得不强迫人类对停机作出响应,充其量是不方便的,最坏的情况下是不可能的。为了真正实现结构化流媒体的产品化,您需要能够尽可能快地自动恢复到故障,同时确保数据一致性和无数据丢失。Databricks使这一点变得无缝:只需设置在发生不可恢复的故障之前的重试次数,Databricks将尝试为您自动恢复流作业。在每次失败时,您都可以触发一个通知作为生产中断。你可以两全其美。系统将尝试自我修复,同时让员工和开发人员了解状态。更新应用程序在更新流式应用程序时,有两种情况需要考虑。在大多数情况下,如果不更改重要的业务逻辑(如输出模式),则可以使用相同的检查点目录重新启动流作业。新的更新的流媒体应用程序将从中断的地方恢复并继续运行。但是,如果要更改有状态操作(如聚合或输出模式),则更新会涉及到更多内容。您必须使用新的检查点目录启动一个全新的流。幸运的是,很容易在Databricks中启动另一个流,以便在转换到新流时并行运行这两个流。高级警报和监视Databricks还支持其他几种高级监视技术。例如,可以使用Datadog、apachekafka或codaholemetrics之类的系统输出通知。这些先进技术可用于实现外部监控和警报系统。下面是如何创建一个StreamingQueryListener的示例,它将把所有查询进度信息转发给Kafka。类KafkaMetrics(servers:String)扩展StreamingQueryListener{val kafkaProperties=新属性()卡夫卡属性.put("引导服务器",服务器)卡夫卡属性.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization系列化.StringSerializer")卡夫卡属性.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization系列化.StringSerializer")val producer=新KafkaProducer[字符串,字符串](kafkaProperties)def onQueryProgress(事件:org.apache.spark网站.sql.streaming.StreamingQueryListener.QueryProgressEvent):单位={生产者.send(new ProducerRecord("流媒体指标",event.progress.json))}def onQueryStarted(事件:org.apache.spark网站.sql.streaming.StreamingQueryListener.QueryStartedEvent):单元={}def onQueryTerminated(事件:org.apache.spark网站.sql.streaming.StreamingQueryListener.QueryTerminatedEvent):单位={}}结论在这篇文章中,我们展示了使用Databricks从原型到产品的结构化流是多么简单。要了解结构化流媒体的其他方面的更多信息,请阅读我们的系列博客:apachespark中的结构化流apachespark2.1中使用结构化流的实时流ETL在ApacheSpark2.1中使用结构化流处理复杂的数据格式在apachespark2.2中用结构化流处理apachekafka中的数据apachespark结构化流中的事件时间聚合与水印技术您可以从Databricks文档中了解更多有关使用流式处理的信息,或者立即注册开始免费试用。免费试用Databricks。今天就开始吧