云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

大带宽_防御cdn_12月免费

小七 141 0

在apachespark2.2中用结构化流处理apachekafka中的数据

这是关于如何使用apachespark执行复杂流分析的多部分系列文章的第三篇。在这个博客中,我们将展示如何利用sparksql的api来消费和转换来自apachekafka的复杂数据流。使用这些简单的api,您可以表达复杂的转换,就像一次事件时间聚合一样,并将结果输出到各种系统。结合使用Apache Spark和Apache Kafka可以:使用与处理批处理数据相同的api转换和扩充从apachekafka读取的实时数据。将从Kafka读取的数据与存储在其他系统(包括S3、HDFS或MySQL)中的信息集成起来。自动受益于Catalyst optimizer提供的增量执行和Tungton随后的高效代码生成。我们首先回顾一下Kafka术语,然后给出从apachekafka读取数据并将数据写入Apache的结构化流查询的示例。最后,我们将探索一个端到端的真实世界用例。阿帕奇·卡夫卡Kafka是一个分布式的发布-订阅消息传递系统,它以并行和容错的方式接收实时数据流并将其提供给下游用户。这使得Kafka适合于构建实时流数据管道,在异构处理系统之间可靠地移动数据。在深入了解结构化流媒体的Kafka支持之前,让我们回顾一下一些基本概念和术语。Kafka中的数据被组织成多个主题,这些主题被划分为多个分区以实现并行性。每个分区都是一个有序的、不可变的记录序列,可以看作是一个结构化的提交日志。生产者将记录附加到这些日志的尾部,而消费者则以自己的速度读取日志。多个消费者可以订阅一个主题,并在到达时接收传入的记录。当新记录到达Kafka主题中的分区时,它们会被分配一个称为偏移量的顺序id号。Kafka集群保留所有已发布的记录,无论它们是否已在一个可配置的保留期内被使用,在这之后,它们被标记为删除。指定要从Kafka读取的数据卡夫卡主题可以看作是一个无限的流,其中数据被保留了可配置的时间量。这个流的无限性意味着,当开始一个新的查询时,我们必须首先决定要读取哪些数据,以及何时从何处开始。在高层次上,有三种选择:最早-从流的开头开始阅读。这不包括已从Kafka中删除的数据,因为这些数据早于保留期("过时"数据)。latest-start now,只处理查询启动后到达的新数据。每个分区分配—为每个分区指定精确的起始偏移量,允许精确控制处理应该从何处开始。例如,如果我们想准确地找出其他系统或查询停止的地方,那么可以利用这个选项。如下所示,startingOffsets选项接受上面三个选项中的一个,并且仅在从新的检查点启动查询时使用。如果从现有的检查点重新启动查询,则查询将始终恢复到原来的位置,除非偏移量处的数据已过时。如果任何未处理的数据过期,那么查询行为将取决于failOnDataLoss选项的设置,该选项在Kafka集成指南中有描述。KafkaConsumer的现有用户会注意到,结构化流媒体提供了更细粒度的配置选项,自动偏移复位. 我们将这些关注点分成两个不同的参数,而不是一个选项,一个参数说明当流第一次启动时要做什么(startingOffsets),另一个参数用于处理当查询无法从它停止的位置提取时该怎么做,因为所需的数据已经过时(failOnDataLoss)。结构化流媒体中的Apache-Kafka支持结构化流提供了一个统一的批处理和流式API,使我们能够将发布到Kafka的数据作为一个数据帧来查看。当以流式方式处理无界数据时,我们使用与批处理相同的API并获得相同的数据一致性保证。该系统确保了端到端的容错保证,因此用户不必为流媒体的低级方面进行推理。让我们研究一下读卡夫卡和写卡夫卡的例子,然后是一个端到端的应用程序。从卡夫卡主题阅读记录第一步是指定Kafka集群的位置以及我们感兴趣的阅读主题。Spark允许您读取单个主题、特定主题集、主题的regex模式,甚至是属于一组主题的特定分区集。我们将只看一个从一个单独的主题阅读的例子,其他的可能性在卡夫卡整合指南中介绍。#构造流式处理从主题1读取的">数据帧"df=火花\.读流\.format("卡夫卡")\.选项("kafka.bootstrap.servers","主机1:port1,主机2:port2")\.option("订阅","topic1")\.option("开始偏移","最早")\.加载()订阅的DataTop1是上面的数据流帧。通过向DataStreamReader提供选项来设置配置,所需的最小参数是kafka.bootstrap.servers(即。主机:端口)以及我们想要订阅的主题。这里,我们还将startingoffset指定为"earliest",它将在查询开始时读取主题中的所有可用数据。如果未指定startingOffsets选项,则使用默认值"latest",并且只处理查询开始后到达的数据。df.printSchema()显示了数据帧的模式。根|--键:二进制(可空=true)|--值:二进制(可为null=true)|--主题:字符串(null=true)|--分区:整数(可空=true)|--偏移量:long(可空=真)|--timestamp:时间戳(可为null=true)|--timestampType:integer(可空=true)返回的DataFrame包含Kafka记录及其相关元数据的所有熟悉字段。我们现在可以使用我们熟悉的数据集或帧的所有操作来转换结果。但是,通常,我们将从分析key和value列中的二进制值开始。如何解释这些blob是特定于应用程序的。幸运的是,sparksql包含许多用于常见序列化类型的内置转换,如下所示。以UTF8字符串形式存储的数据如果Kafka记录的字节表示UTF8字符串,我们可以简单地使用cast将二进制数据转换为正确的类型。数据框选择表达式("CAST(key AS STRING)","CAST(value AS STRING)")数据存储为JSONJSON是写入Kafka的数据的另一种常见格式。在本例中,我们可以使用内置的from_json函数和预期的模式将二进制值转换为Spark SQL结构。#值模式:{"a":1,"b":"string"}schema=StructType().add("a",IntegerType()).add("b",StringType())数据框选择( \col("key").cast("string"),from_json(col("value").cast("string"),模式)用户定义的序列化程序和反序列化程序在某些情况下,您可能已经有了实现Kafka反序列化器接口的代码。您可以使用下面所示的Scala代码将其包装为一个用户定义函数(UDF),从而充分利用这段代码。对象MyDeserializerWrapper{val deser=新建MyDeserializer}spark.udf.register("反序列化",(主题:字符串,字节:数组[Byte])=>米ydeSerializerRapper.deserialize(主题,字节))数据框选择表达式("反序列化("topic1",值)为消息"")注意,上面的数据帧代码类似于指定值.反序列化程序当使用标准的卡夫卡消费者。用Spark做卡夫卡制作人将数据从任何支持Spark的数据源写入Kafka就像在包含名为"value"的列和可选的名为"key"的列的任何数据帧上调用writeStream一样简单。如果没有指定键列,则会自动添加一个空值的键列。在某些情况下,空值键列可能会导致Kafka中的数据分区不均匀,应谨慎使用。DataFrame记录的目标主题可以静态地指定为DataStreamWriter的一个选项,也可以按每条记录指定为DataFrame中名为"topic"的列。#将数据帧中的键值数据写入选项中指定的Kafka主题查询=df\.selectExpr("CAST(userId AS STRING)AS key","to_json(struct(*))AS value")\.writeStream公司\.format("卡夫卡")\.选项("kafka.bootstrap.servers","主机1:port1,主机2:port2")\.option("主题","topic1")\.option("检查点位置","/path/to/HDFS/dir")\。开始()上面的查询获取一个包含用户信息的数据帧并将其写入Kafka。userId被序列化为字符串并用作键。我们获取DataFrame的所有列并将它们序列化为JSON字符串,将结果放入记录的值中。向卡夫卡写信所需的两个选项是kafka.bootstrap.servers以及检查点的位置。在上面的例子中,可以使用一个附加的topic选项来设置要写入的单个topic,如果topic列存在于DataFrame中,那么这个选项将覆盖它。使用嵌套设备的端到端示例在本节中,我们将探讨涉及Kafka以及其他数据源和接收器的端到端管道。我们将使用一个包含嵌套设备日志集合的数据集,这里描述了JSON格式。我们将特别检查来自Nest摄像头的数据,这些数据类似于以下JSON:"设备":{"摄像头":{"device_id":"awJo6rH…","上次事件":{"has_sound":真的,"有运动":是的,"有人"是真的,"开始时间":"2016-12-29T00:00:00.000Z","结束时间":"2016-12-29T18:42:00.000Z"}}}我们还将加入一个