云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

数据库_赠送企业邮箱_促销

小七 141 0

介绍Spark的Redshift数据源

这是Axiomine大数据架构师/数据科学家sameerwadkar的客座博客。apachespark1.2引入了sparksqldatasourcesapi,为与各种结构化数据源集成提供了一种可插入的机制。Spark用户可以从各种源读取数据,比如配置单元表、JSON文件、columnar Parquet表和许多其他源。第三方数据源也可通过spark获得-软件包.org. 这篇文章讨论了一个新的Spark数据源来访问Amazon Redshift服务,Spark的Redshift数据源是一个由Databricks维护的包,来自SwiftKey和其他公司的社区贡献。在Spark引入Redshift数据源之前,Spark的JDBC数据源是Spark用户从Redshift读取数据的唯一方式。当运行返回少量行(100行的顺序)的查询时,这种方法已经足够了,但是在处理大规模数据时,它太慢了。原因是JDBC提供了一种基于ResultSet的方法,在这种方法中,可以在单个线程中以小批量方式检索行。此外,只有当数据需要在Redshift数据库中的表之间移动时,才可以使用JDBC在Redshift中存储大数据集。基于JDBC的INSERT/UPDATE查询只适用于Redshift表的小更新。对于希望从Redshift加载或存储大量数据的用户来说,JDBC在性能和吞吐量方面还有很多需要改进的地方。使用此软件包,可以通过自动化一组手动步骤来简化与Redshift服务的集成,否则将需要这些步骤将大量数据移入和移出Redshift。为了理解它是如何做到的,让我们看看如何将Redshift数据库中的大型数据集与其他数据源中的数据集集成起来。我们还将探讨这个包如何扩展Redshift和Spark用户的可能性范围。传统上,数据必须从HDFS移动到Redshift进行分析。然而,这个软件包将允许Redshift与存储在S3、配置单元表、CSV或HDFS上的Parquet文件中的数据无缝地交互操作。这将简化ETL管道,并允许用户对系统的逻辑和统一视图进行操作。从红移读取假设您要在Spark中处理整个表(或返回大量行的查询),并将其与来自另一个大型数据源(如配置单元)的数据集组合。将Redshift表(查询)数据加载到符合架构的DataFrame实例中的命令集是:val jdbcURL="jdbc:红移://测试-红移.czac2vcs84ci.us东-。redshift.amazonaws.com:5439/testredshift?PgJfgjw8gjw8gjzw8pzUSER=密码val tempS3Dir="s3n://spark redshift testing/temp/"val销售数据=sqlContext.read.格式("com.databricks.spark网站.redshift").option("url",jdbcURL)//提供JDBC url.option("tempdir",tempS3Dir)//用户提供一个临时的S3文件夹.option("dbtable","sales")//或使用.option("query","select*from sales").加载()上面的命令为Redshift表(query)提供了一个DataFrame实例。用户只需要提供jdbcurl、这个包卸载Redshift数据的临时S3文件夹以及表或查询的名称。DataFrame实例可以注册为Spark中的临时表,可以直接对其执行查询。salesDF.registerEmptable销售数据("红移销售")val newSalesDF=sqlContext.sql("从销售中选择计数(*)从红移中选择"使用SQL命令行界面(CLI)可以获得相同的结果,如下所示:>从eu redshift创建临时表sales使用com.databricks.spark网站.红移选项(dbtable"销售",tempdir's3n://spark redshift testing/temp/',网址'jdbc:红移://测试-红移.czac2vcs84ci.us东-。redshift.amazonaws.com:5439/testredshift?user=redshift&password=W9P3GC42GJYFpGxQtaCBitxPszAc8iZFW');>选择count(*)FROM sales\u FROM_redshift;请注意,我们如何将检索到的Redshift表注册为Spark中的\u Redshift临时表sales_,并使用以下命令直接对其执行查询:选择count(*)FROM sales\u FROM_redshift;在幕后,这个包执行Redshift UNLOAD命令(使用JDBC),该命令将Redshift表与用户提供的临时S3 bucket并行复制。接下来,它使用hadoopinputformat API并行读取这些S3文件,并将其映射到一个RDD实例。最后,它将使用JDBC元数据检索功能检索到的表(或查询)的模式应用于前一步中生成的RDD,以创建一个DataFrame实例。Spark的Redshift数据源无法自动清理它在S3中创建的临时文件。因此,我们建议您使用带有对象生命周期配置的专用临时S3存储桶,以确保在指定的过期时间后自动删除临时文件。向红移写信Spark数据源API是一个功能强大的ETL工具。大数据系统中的一个常见用例是从一个系统中获取大规模数据,以分布式方式对其应用转换,然后将其存储回另一个系统中。例如,典型的做法是从HDFS中的配置单元表中获取数据,然后将这些表复制到Redshift中以允许交互处理。这个包非常适合这个用例。假设源于配置单元的事务表在Spark环境中可用,并且需要复制到相应的Redshift table Redshift_事务。以下命令可实现此目标:sqlContext.sql("从交易中选择*").写入.格式("com.databricks.spark网站.redshift").option("url",jdbcURL).选项("tempdir",tempS3Dir).option("dbtable","redshift_transaction").模式(保存模式。覆盖).save()使用SQL CLI可以获得如下相同的结果:创建表redshift\u事务使用com.databricks.spark网站.红移选项(dbtable"redshift_事务",tempdir's3n://spark redshift testing/temp/',网址'jdbc:红移://测试-红移.czac2vcs84ci.us东-。redshift.amazonaws.com:5439/testredshift?user=redshift&password=W9P3GC42GJYFpGxQtaCBitxPszAc8iZFW')作为SELECT*FROM transaction; 注意模式(保存模式。覆盖)在上面的Scala代码中。这表示Spark的Redshift数据源将覆盖表(如果存在)。默认情况下(只有SQL CLI模式下可用)如果表已经存在,则此包将抛出错误(SaveMode.errorif存在)还有一个保存模式。追加模式,该模式在表不存在时创建表,如果表存在,则追加到表。最后一种模式是保存模式。忽略如果表不存在,则创建该表;如果表已存在,则忽略整个命令。在幕后,Spark的Redshift数据源将首先使用JDBC在Redshift中创建表。然后,它将源数据帧(在我们的示例中是一个配置单元表)实例封装的分区RDD复制到临时S3文件夹中。最后,它执行Redshift COPY命令,该命令对新创建的Redshift表执行S3文件夹内容的高性能分布式复制。与其他数据源集成通过这个包读取的数据会自动转换为DataFrame对象,这是Spark对大型数据集的主要抽象。这促进了数据源之间的互操作性,因为类型会自动转换为Spark的标准表示(例如StringType、DecimalType)。例如,Redshift用户可以将Redshift表与存储在S3中的数据、配置单元表、CSV或存储在HDFS上的Parquet文件连接起来。这种灵活性对于具有涉及多个源的复杂数据管道的用户非常重要。为Spark使用红移数据源我们在这篇博客文章中的目标是介绍这个包,并概述它如何将Redshift集成到Spark的统一数据处理平台中。要尝试这些新功能,请立即下载Spark 1.5或注册使用Databricks进行为期14天的免费试用。我们还提供了一个非常详细的教程。本教程将引导您完成创建示例Redshift数据库的过程。然后,它将演示如何通过本地开发环境中的这个包与Redshift交互。免费试用Databricks。今天就开始吧