云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

网站空间_京东云v6_0元

小七 141 0

卡夫卡集成火花流的改进

apachekafka正在迅速成为最流行的开源流接收平台之一。我们在Spark Streaming的用户中也看到了同样的趋势。因此,在apachespark1.3中,我们将重点放在对Spark流的Kafka集成进行重大改进。这导致了以下补充:新的针对Kafka的直接API–这允许每个Kafka记录在失败的情况下只处理一次,而不使用预写日志。这使得Spark Streaming+Kafka管道更加高效,同时提供更强的容错保证。PythonAPI for Kafka—这样您就可以开始纯粹从Python处理Kafka数据。在本文中,我们将更详细地讨论这些改进。卡夫卡直接API[主要贡献者-科迪]Spark Streaming从一开始就支持Kafka,并且Spark Streaming已经在许多地方与Kafka一起在生产中使用(见本文)。然而,Spark社区要求更好的容错保证和更强的可靠性语义。为了满足这一需求,spark1.2引入了预写日志(Write-Ahead Logs,WAL)。它确保从任何可靠的数据源(如Flume、Kafka和Kinesis等事务性源)接收的数据不会因失败而丢失(即至少一次语义)。即使是对于不可靠的(即非事务性的)源,比如普通的旧套接字,它也能最大限度地减少数据丢失。然而,对于允许从流中的任意位置重放数据流的源(例如Kafka),我们可以实现更强大的容错语义,因为这些源让Spark Streaming对数据流的消耗有更多的控制。Spark 1.3引入了直接API的概念,即使不使用预写日志,也可以实现一次语义。让我们看看Spark为ApacheKafka提供的直接API的详细信息。我们是怎么建的?在高层,早期的Kafka集成使用了预写日志(WAL),如下所示:卡夫卡数据由运行在Spark workers/executors中的Kafka接收器连续接收。这使用了Kafka的高级消费者API。接收到的数据存储在Spark的worker/executor内存中以及WAL(在HDFS上复制)。只有在数据被持久化到日志中之后,Kafka接收器才将Kafka的偏移量更新到Zookeeper。关于接收到的数据及其WAL位置的信息也被可靠地存储。失败时,此信息用于重新读取数据并继续处理。 虽然这种方法可以确保Kafka中的数据不会丢失,但由于失败(即至少一次语义),某些记录仍有可能被多次处理。当某些接收到的数据可靠地保存到WAL中,但系统在更新Zookeeper中相应的Kafka偏移量之前失败时,可能会发生这种情况。这导致了不一致性–Spark Streaming认为数据已收到,但Kafka认为数据未成功发送,因为Zookeeper中的偏移量未更新。因此,Kafka将在系统从故障中恢复后再次发送数据。出现这种不一致是因为这两个系统不能用描述已经发送的信息进行原子更新。为了避免这种情况,只有一个系统需要对已发送或接收的内容保持一致的视图。此外,在从故障恢复期间,该系统需要完全控制数据流的重放。因此,我们决定将所有消耗的偏移量信息仅保留在Spark Streaming中,Spark Streaming可以使用Kafka的简单消费者API,根据需要回放因故障而产生的任意偏移量的数据。为了构建这个(主要贡献者是Cody),新的直接kafkaapi采用了与接收者和wal完全不同的方法。我们只需在一个接收间隔内连续存储一批数据,而不是用它来决定每批数据的接收间隔。之后,当执行每个批的作业时,从Kafka读取与偏移范围相对应的数据以进行处理(类似于HDFS文件的读取方式)。这些偏移量也可以可靠地保存(使用检查点),并用于重新计算数据以从故障中恢复。请注意,Spark流可以重新读取和重新处理来自Kafka的流片段,以从失败中恢复。然而,由于RDD转换的一次特性,最终重新计算的结果与没有失败的结果完全相同。因此,这种直接API消除了对Kafka的wal和receiver的需要,同时确保每个Kafka记录都被Spark流有效地接收一次。这使得我们可以构建一个Spark Streaming+Kafka管道,具有端到端的一次语义(如果对下游系统的更新是幂等的或事务性的)。总的来说,它使这种流处理管道更容错、更高效、更易于使用。如何使用它?新的API比以前的API更易于使用。//定义Kafka参数,必须指定broker listval kafkaParams=地图("元数据.broker.list" -> "本地主机:9092,一个其他主持人:9092")//定义阅读的主题val topics=Set("sometopic","anothertopic")//用Kafka参数和主题创建直接流val kafkaStream=kafkatils.createDirectStream[字符串,字符串,字符串解码器,字符串解码器](streamingContext,kafkaParams,主题)由于这种直接方法没有任何接收器,因此不必担心创建多个输入数据流来创建更多接收器。也不必配置每个接收器要使用的Kafka分区的数量。每个Kafka分区将自动并行读取。此外,每个Kafka分区对应一个RDD分区,从而简化了并行模型。除了新的流媒体API,我们还引入了kafkatils.createRDD(),可用于对Kafka数据运行批处理作业。//定义要在批处理作业中读取的偏移范围val offsetRanges=数组(OffsetRange("一些主题",0,110,220),OffsetRange("一些主题",1100,313),OffsetRange("另一个主题",0456789))//根据偏移范围创建RDD价值rdd=kafkatils.createRDD[字符串,字符串,字符串解码器,字符串解码器](sparkContext,kafkaParams,offsetRanges)如果您想了解更多关于API的信息以及它是如何实现的,请看下面的内容。Spark Streaming+Kafka集成指南科迪的博客文章有更多细节Scala和Java中directapi的完整字数示例直接API的Scala和Java文档Spark流编程指南中更新的容错语义用于Kafka的Python API[主要贡献者-戴维斯]在Spark 1.2中,添加了Spark Streaming的基本PythonAPI,以便开发人员可以纯粹用Python编写分布式流处理应用程序。在Spark 1.3中,我们扩展了PythonAPI以包括Kafka(主要由Davies Liu提供)。这样,用Kafka用Python编写流处理应用程序变得轻而易举。下面是一个示例代码。卡夫卡斯特林=kafkatils.createStream(精简上下文,"动物园管理员-服务器:2181","消费者群",{"某个话题":1})行=kafkaStream.map(λx:x[1])运行示例的说明可以在Kafka集成指南中找到。请注意,要运行示例或使用kafkaapi的任何python应用程序,必须将Kafka Maven依赖项添加到路径中。这在Spark 1.3中很容易实现,因为您可以直接将Maven依赖项添加到Spark submit(推荐的启动Spark应用程序的方法)。有关更多详细信息,请参阅《Kafka集成指南》中的"部署"部分。还要注意,这是使用早期的Kafka API。正在将Python扩展到directapi,预计将在spark1.4中提供。此外,我们还希望将其余内置源代码的Python api添加到,以实现Scala、Java和Python流式api之间的奇偶性。未来方向我们将不断提高卡夫卡集成的稳定性和性能。我们打算做的一些改进如下:当批处理成功时自动更新Zookeeper,使基于Zookeeper的Kafka监控工具能够正常工作–SPARK-6051直接API的Python API–SPARK-5946将这种直接API方法扩展到Kinesis–SPARK-6599Kafka简单消费者API跨批处理的连接池 免费试用Databricks。今天就开始吧