云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

消息队列_多串口服务器_试用

小七 141 0

如何在Databricks中使用Avro、Kafka和Schema Registry

在上一篇博文中,我们介绍了apachespark中新的内置Apache Avro数据源,并解释了如何使用它来构建具有from_Avro和to_Avro函数的流数据管道。apachekafka和apacheavro通常用于构建可伸缩的、接近实时的数据管道。在这篇博文中,我们将介绍如何在Databricks中构建更可靠的管道,并集成合流的Schema Registry。这个特性从Databricks运行时4.2开始就可用了。模式演化对于长时间运行的流作业,数据流的模式通常会随着时间而变化。模式演化是流媒体世界中的一个典型问题。例如,为了支持业务逻辑中的更改,您需要通过向数据流添加新列来进行相应的更改。模式更改可能会破坏现有的数据管道并导致服务中断。在模式演化的情况下,管道设计需要回答以下问题,而不是停止、更新和重新启动管道:哪些架构更改是安全的?如何以经得起未来考验的方式从数据流中读取数据?如何跟踪数据流的变化历史?schemaregistry是基于Kafka的数据管道最流行的解决方案。与Apache配置单元元存储类似,它记录所有已注册数据流的模式,以及模式更改历史记录。它还定义了多个兼容级别。例如,您可以强制只允许向后兼容的架构更改。为了支持以未来验证的方式读取数据流,您需要在每个记录中嵌入模式信息。因此,模式标识符而不是完整的模式,是每个记录的一部分。Schema Registry提供自定义Avro编码器/解码器。您可以使用模式标识符对Avro记录进行编码和解码。Databricks将模式注册表集成到from_avro和to_avro函数中。您可以轻松地迁移构建在Schema Registry上的流式处理管道,以激发结构化流。此外,from_avro和to_avro函数也可以用于批处理查询,因为结构化流将Spark SQL引擎中的批处理和流处理统一起来。使用架构注册表的示例代码您可以导入带有示例的笔记本并自己玩,也可以在线预览。假设您已经在集群中部署了Kafka和Schema Registry,并且存在一个Kafka主题"t",其键和值分别在Schema Registry中注册为string和int类型的主题"t-key"和"t-value"。下面的代码用schemaval df=火花.读流.format("卡夫卡").选项("kafka.bootstrap.servers",卡夫卡尔).option("订阅","t").加载()。选择(从_avro($"key","t-key",schemaRegistryURL).as("key"),从_avro($"value","t-value",schemaRegistryURL).as("value"))下面的代码将schema的Spark数据帧写入Kafka主题"t"。数据数据框。选择(到\u avro($"key",lit("t-key"),schemaRegistryURL).as("key"),to_avro($"值",lit("t-value"),schemaRegistryURL).as("value").writeStream公司.format("卡夫卡").选项("kafka.bootstrap.servers",服务器).option("主题","t").save()阅读更多阅读有关Azure Databricks和AWS的架构注册表的更多信息。下载笔记本或在这里阅读。免费试用Databricks。今天就开始吧