云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

亚马逊云_网站建设开发公司_怎么买

小七 141 0

寻找隐藏的宝藏:Riak NoSQL数据库的ApacheSpark连接器

在Databricks中查看此笔记本这是我们在Basho的朋友的博客。pavelhardak是Basho的产品管理总监。本文介绍了Riak Spark Connector,这是一个开放源代码库,它在apachespark和Riak NoSQL数据库之间架起了桥梁。它将apachespark的全部功能应用于在Riak分布式集群中管理的操作数据。有了Riak Apache Spark Connector,Riak用户现在有了一个集成的、可扩展的大数据分析解决方案,Spark用户现在有了一个弹性的、高可用的数据存储。关于里亚克Riak是一个开源的分布式NoSQL数据库,由Basho技术开发和支持。Basho提供两个主要产品:Riak KV(键值)和Riak TS(时间序列)。这两种产品共享相同的核心代码库,但针对不同的用例进行了调整。Riak KV是一个高弹性、可扩展、关键价值存储。Riak KV以其线性方式上下扩展的能力而闻名,能够以较低的延迟处理大量的读取、更新和写入,同时具有极高的可靠性和容错性。最近,Riak-TS被引入,专门针对时间序列数据进行了优化。它添加了非常快速的批量写入、非常高效的"时间片"读取查询,并支持Riak-TS表上的SQL语言子集。介绍用于apachespark的Riak连接器我们发现,许多领先的组织在其基础设施中混合使用NoSQL和SQL数据库产品,因为每种产品都有其特定的优势,这取决于用例。过去,一些数据库更多地用于分析工作负载,而其他数据库则用于操作性工作负载。随着现代NoSQL数据库,如Riak,正在获得新的功能,它们正被用于其他用例,如物联网、指标和边缘设备分析。为了更容易地执行这些任务,Basho创建了一个Riak Spark连接器,因为我们相信apachespark是当前与Riak一起使用的最佳技术选择。Basho之所以选择Spark进行开发,不仅是因为客户和市场需求,还因为Spark和Riak共享主要的设计原则:高性能、可伸缩性、弹性和操作简单性。为Riak实现apachespark连接器Riak KV存储桶使用"AWS Dynamo"论文中的原理建模,适用于需要近实时的频繁、小数据量操作的场景,尤其是具有读、写和更新的工作负载—这可能会导致某些分布式数据库中的数据损坏,或使它们在更大的工作负载下"爬行"。在Riak中,每个数据项被复制到多个节点上,这使得数据库能够以非常低的延迟处理大量操作,同时具有独特的反腐败和冲突解决机制。然而,与apachespark的集成需要一种非常不同的操作模式——批量提取大量数据,这样Spark就可以在内存中对整个数据集执行"魔法"。解决这个问题的一种方法是创建大量的Spark工作人员,每个人都需要几个数据项。这种方法在Riak中很好地工作,但是它在Spark方面产生了不可接受的开销。另一个选择是使用Riak内置的二级索引查询(2i)。在这种方法中,用户的应用程序通过一个查询联系任何一个Riak节点,然后这个Riak节点成为一个"协调节点",查询所有其他相关的Riak节点,收集所需的密钥并将其流回到用户应用程序。然后用户应用程序将循环这些键以检索值。遗憾的是,结果集更大的查询可能会使协调节点过载。又一次,结果不好,所以我们不得不教里亚克新的技巧。解决方案是通过智能覆盖计划和并行提取api来增强2i查询。在新的方法中,用户应用程序与协调节点联系,但这一次,该节点没有执行所有工作,而是使用集群复制和可用性信息返回数据的位置。然后"N"Spark workers打开到不同节点的"N"并行连接,这允许应用程序以更快的速度检索所需的数据集"N",而不会生成"热点"。为了使它更快,我们实现了一种特殊类型的批量查询,称为"完整的bucket read",它在不需要查询条件的情况下提取整个逻辑桶。此外,它还返回键和值,从而节省到服务器的另一次往返。Riak-KV存储桶的优势在于它能够在无模式架构中存储非结构化数据,并且"值"是不透明的。但是对于许多Spark用例,数据必须映射到Scala或Java类型的记录中。幸运的是,许多Riak应用程序使用JSON,这使得Spark开发人员可以通过提供用户定义的模式轻松地将其转换为Spark数据帧。转换是"动态"进行的,这使得Spark程序员更容易处理检索到的数据。进口sqlContext.隐式._val sqlContext=新建org.apache.spark网站.sql.SQLContext(sc)case类UserData(用户_id:String,name:String,age:Int,category:String)val kv_bucket_name=新命名空间("测试数据")val riakRdd=桑椹[用户数据](kv_bucket_name).queryAll()值df=里亚克尔德托夫()数据框。在哪里(df("age")>=50.选择("id","name")数据框groupBy("类别")。计数Riak TS满足Spark SQL作为一个分布式、无主控、高可用性和线性可伸缩的NoSQL数据存储,Riak TS增加了许多类似SQL的功能。它包括一个表的DDL(是的,CREATE TABLE…),具有命名的属性和数据类型、主键(用于本地索引和集群)、带有过滤器和聚合的SQL查询语言的子集等等(正如我们所说,还添加了其他SQL命令)。为NoSQL数据库添加SQL支持本身并不是一件小事,我们很乐意利用SQL功能,将它们映射到众所周知的Spark结构,比如DataFrames和sparksql。Riak Spark Connector在Spark worker之间自动划分SQL查询。riakts还支持key/value功能,它没有模式,因此我们使用Spark RDDs与key/value(KV)bucket集成。将设备元数据、配置信息和配置文件存储在key/value bucket中非常方便和高效。val ts_table_name="输入表"值df=sc.riakTSTable公司(ts_table_名称).sql(s"SELECT*FROM$ts_table_name其中time>=$FROM和time=CAST($from AS TIMESTAMP)和time