云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

域名交易_绝地求生美国服务器_便宜的

小七 141 0

最前沿:火花,拼花和S3

最前沿:Spark、Parquet和S3 Arnon Rotem Gal Oz 2015年8月10日,Spark正在成为Map/Reduce的领先替代品,原因包括不同的Hadoop发行版广泛采用,在单个平台上结合了批处理和流媒体,以及不断增长的机器学习集成库(两者都是包括算法以及与机器学习语言(即R和Python)的集成。在AppsFlyer,我们使用Spark作为ETL(提取、转换和加载)和分析的主要框架已有一段时间了。最近的一个例子是我们最近发布的新版retention report,它利用Spark利用ETL(主要是数据清理)和分析(实现完全点击欺诈检测的垫脚石)对多个数据流(每天>1TB)进行处理,以生成报告。一我们在本报告中引入的主要变化是在序列文件上构建以使用拼花地板文件。Parquet是一种列式数据格式,这可能是目前存储长期大数据以进行分析的最佳选择(除非您在Hive上投入了大量资金,Orc是更合适的格式)。Parquet与Sequence文件的优势在于性能和压缩,同时又不丧失大数据工具(Spark、Hive、Drill、Tajo、Presto等)的广泛支持。我们的大数据基础设施的一个相对独特的方面是我们不使用Hadoop(这可能是另一篇文章的主题)。我们使用Mesos作为资源管理器而不是YARN,我们使用amazons3代替HDFS作为分布式存储解决方案。HDFS比S3有几个优点,但是在AWS上运行长时间运行的HDFS集群与使用S3相比,其成本/收益是压倒性的,而S3更适合使用S3,拼花和S3对我们提出了几个挑战,这篇文章将列出主要的挑战和我们提出的解决方案它们。镶木地板&SparkParquet和Spark似乎已经有一段爱恨关系了。一方面,Spark文档将Parquet吹捧为大数据分析的最佳格式之一(it is),另一方面,Spark中对Parquet的支持是不完整的,使用起来很烦人。事情确实朝着正确的方向发展,但仍有一些怪癖和陷阱需要警惕为了。到从积极的方面开始,Spark和拼花地板的整合在过去的几个月里取得了长足的进步。以前,为了能够将现有的数据转换成拼花地板,人们不得不跳出圈套。在Spark中引入数据帧使这个过程简单得多。当DataFrame API支持输入格式时,例如,输入是JSON(内置)或Avro(尚未内置在Spark中,但您可以使用库来读取),转换为Parquet只需在一侧读取输入格式并将其作为Parquet保存在另一侧。以Scala中的以下代码片段为例:val inputPath="../data/json"val outputPath="../data/parquet"val数据=sqlContext.read.json(输入路径)日期.书写.拼花(输出路径)查看原始json格式-拼花地板托管❤ 通过GitHub即使在处理模式不是数据一部分的格式时,转换过程也非常简单,因为Spark允许您以编程方式指定模式。Spark文档非常简单,包含Scala、Java和Python中的示例。此外,用其他语言定义模式并不复杂。例如,在这里(AppsFlyer),我们使用Clojure作为我们的主要开发语言,所以我们开发了两个helper函数来实现这一点。下面的示例代码提供了详细信息:第一件事是从我们拥有的任何结构中提取数据并指定我们喜欢的模式。下面的代码获取一个事件记录,并从中提取各种数据点到格式为[:column_name value optional_data_type]的向量中。数据类型是可选的,因为除非另有指定,否则它被假定为字符串。(defn记录生成器【事件记录】(让。。原始设备参数(提取事件记录"原始设备参数")结果[。。。[:operator(获取原始设备参数"operator")][:model(获取原始设备参数"model")]...[:启动_counter counter DataTypes/LongType]]]结果)查看原始架构_样品.clj托管❤ 通过GitHub下一步是使用上述结构来提取模式并转换为数据帧行:(defn extract dataframe模式[记录](let[字段(reduce(fn[lst schema line](let[k(第一个架构行)(3)数据行类型(=t)架构类型(最后一行)(conj lst(数据类型/createStructField(name k)t NULLABLE)))[]rec)arr(阵列列表。字段)](数据类型/createStructType arr)))(定义为行[记录](let[值(对象数组(reduce(fn[lst v](conj lst v))[]rec))](行工厂/创建值)))查看原始转换.clj托管❤ 通过GitHub最后,我们在RDD上应用这些函数,将其转换为数据帧并另存为Parquet:(让。。模式(trans/extract dataframe schema(record builder nil))..rdd(spark/map record builder我们有的一些rdd)行(spark/map trans/as rows rdd)数据帧(spark/create data frame sql context rows schema)](spark/save parquert数据帧输出路径:覆盖)查看原始rdd\U至_拼花地板.clj托管❤ 通过GitHub如前所述,镶木地板和Spark的发展前景看好,但道路还不明朗。我们遇到的一些问题包括:关键版本1.4版本中的一个bug,其中写入Parquet文件时的竞态条件会导致作业中的大量数据丢失(这个bug在版本1.4.1中已修复–因此,如果您昨天使用的是Spark 1.4和Parquet upgrade!)Filter pushdown optimization,默认情况下关闭,因为Spark仍然使用Parquet 1.6.0rc3–尽管1.6.0已经发布了一段时间(看来Spark 1.5将使用Parquet 1.7.0,因此问题将得到解决)Spark中不支持Parquet,相反,Spark依赖Hadoop对拼花格式的支持—这不是这本身就是一个问题,但对我们来说,当我们尝试将Spark和Parquet与S3一起使用时,它导致了主要的性能问题,在下一节Parquet中,Spark&S3Amazon S3(简单存储服务)是一个相对便宜的对象存储解决方案。"一个应用程序对另一个应用程序所做的"最终一致性"的主要缺点是"一个文件对另一个应用程序所做的改变是显而易见的"。(如果您使用的是Amazon的EMR,那么可以使用EMRFS的"一致视图"来克服这个问题。)但是,如果您理解了这个限制,S3仍然是一个可行的输入和输出源,至少对于批处理来说是这样乔布斯。就像上面提到过,Spark没有原生的S3实现,它依赖Hadoop类来抽象对Parquet的数据访问。Hadoop为S3:S3块文件系统提供了3个文件系统客户机(格式为"S3://…"的URI模式)似乎不能与SparkS3本机文件系统("s3n://…"URIs)一起使用——下载支持hadoop2.*的Spark发行版,如果您想使用它(tl;dr–你没有)s3a–一个s3n的替代品,它消除了s3n的一些限制和问题。下载Spark和hadoop2.6并一直使用这个版本当我们使用Spark 1.3时,我们在尝试使用S3时遇到了很多问题,所以我们开始使用s3n–这在很大程度上起到了作用,i、 我们有一些作业正在运行并完成,但很多作业都失败了,出现了各种读取超时和主机未知异常。看看工作中的任务,情况就更糟了,高比例的失败迫使我们把超时和重试次数增加到荒谬的水平。当我们转到Spark 1.4.1时,我们又尝试了s3a,这次我们让它开始工作了。我们要做的第一件事就是把两者都设置好spark.executor.extraClassPath以及spark.executor.extraDriverPath指出awsjavasdk和hadoopawsjar,因为这两个显然都没有出现在"sparkswithhadoop2.6"构建中。很自然,我们使用了这些文件的2.6版本,但是后来我们遇到了这个小问题。hadoop2.6aws实现有一个bug,导致它以意想不到的方式分割S3文件(例如,运行了400个文件的作业,完成了1800万个任务),幸运的是,将hadoopawsjar替换为Spark的2.7.0版本解决了这个问题,并且使用s3a前缀可以正常工作(a