云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

金山云_服务器系统备份_返现

小七 141 0

客座博客:禅与apachespark的艺术与Cassandra的维护

这是我们在DataStax的朋友发来的客座帖子。阿帕奇·卡桑德拉™ 是一个完全分布式、高度可扩展的数据库,允许用户创建始终处于运行状态并可以处理大量数据的在线应用程序。阿帕奇火花™ 是一个处理引擎,它使Hadoop集群中的应用程序在内存中的运行速度提高了100倍,在磁盘上运行时甚至提高了10倍。因此,这两种技术找到彼此,对存储在高性能事务数据库中的实时、操作数据进行极快的分析,这只是时间问题。这篇博客文章将详细介绍Spark的内部工作原理,以及如何利用Spark和Cassandra之间的交互来塑造应用程序,并回答Cassandra+Spark主题中最常见的问题。Spark架构基础Spark以4个进程为中心,我们可以使用jps命令在运行的系统上查看它们。9: 57:59/~jps#javaps,列出所有正在运行的Java进程15687 DseSparkMaster#Spark Master(可并入DSeDemon)22232 DseSparkWorker#Spark Worker22652粗粒度执行器后端#Spark Executor22653日元22415 SparkSubmit火花驱动程序(您的应用程序)Spark主JVM与Hadoop作业跟踪器类似,它不需要大量的RAM,因为它只需要将工作分发到集群。Spark Worker JVM与Hadoop任务跟踪器类似,它也不需要大量的RAM,因为它的主要职责是启动执行器进程。在DSE/StandAlone模式下,Worker将只为每个应用程序启动一个executor JVM,但这并不意味着您将只在计算机上使用1个内核。单个执行器JVM最多将使用worker上可用的"max cores"(spark.cores.max). 只有多个worker才可能有多个执行器JVM(dse4.7特性)火花执行器JVM火花性能最重要的部分。基本上,这将是一组只处理RDD任务的进程。CPU要求分配给这个执行器的每个核心将能够一次处理一个任务。这意味着一个由Spark定义的具有20个核心的集群将能够同时处理20个任务。如果正在处理的RDD少于20个分区,那么集群将无法充分利用。下面将有更多关于如何生成任务/分区的详细信息,但是一般来说,任务的数量应该大于核心的数量。您可以在工作线程上设置比物理核心更多的可用核心。这对I/O绑定的任务有好处,但在运行Cassandra的节点上过度订阅cpu可能是个坏主意。如果订阅过多,系统将依靠操作系统来决定卡桑德拉何时响应请求或发出心跳。有一些有趣的想法可以通过使用cgroups来限制集群资源来缓解这种情况,但是我对这些策略了解得不够多,无法推荐它们。RAM要求现在让我们想象一下,在这个集群中,我们有4个物理节点,每个节点上有5个核心。这意味着每台机器都将有一个执行器JVM(很可能命名为dcarsegrainedexecutorbackend)。这个JVM的堆将在执行器之间共享,并具有与任何其他基于JVM的应用程序相同的警告。大堆将导致较长的垃圾收集时间,并且非常大的堆是站不住脚的。在Spark这样的批处理应用程序中,大小限制不太重要,因为1秒的stop-The-world GC在30分钟的任务中并不意味着太多。Databricks在堆中的建议大小为55gb。在设置executor JVM大小时,请记住您正在从C*和操作系统中占用内存。一定要留出足够的空间让C*运行,并让页面缓存帮助C*操作。RDD存储分数最大的部分是缓存,它将用于将RDD分区保存在内存中。因为实际上从Cassandra中检索数据对大多数用户来说都是一个瓶颈,因为大多数应用程序都会从C*中提取大量数据,然后在内存中对其进行处理。默认值为heap的60%,并设置为(火花存储记忆提取). 请随意调整这一点,但要记住堆的其他两部分。注意:根据Spark文档,这个部分应该与JVM中旧的一代大小大致相同。应用程序代码和随机存储Spark shuffle是通过shuffle管理服务组织和执行的,该服务使用洗牌储存执行器的一部分,在将数据写入文件并通过网络传送之前实际移动数据(并对其进行排序)。任何需要完全排序、groupBy或join的操作都将触发完全洗牌,因此减少此设置时请小心(火花。洗牌。记忆提取)从默认值0.2开始。堆的剩余部分用于应用程序代码,可以根据应用程序所需的代码和jar进行缩放。每个worker上必须至少有请求数量的可用RAM,才能为Spark应用程序创建一个执行器。在异构集群中,这意味着如果希望任务在所有机器上运行,那么执行器内存不能大于最小的工作线程。网络这些部件之间将建立以下连接:驾驶员主人工人驱动程序执行器这里要解决的关键问题之一是驱动程序和执行器之间的连接。通常情况下,如果作业开始但随后过早终止,并且由于超时的未来而导致"意外的……异常",那么这种通信就会出现问题。大多数网络连接问题是因为最后一个链接(驱动程序和执行器之间)没有成功建立。记住,这意味着不仅驱动程序必须能够与执行器通信,而且执行器必须能够与驱动程序连接。如果您有困难,请确保Spark config选项spark.driver.host火花点燃-默认值.conf)匹配运行驱动程序应用程序的计算机上的可访问IP地址。在某些情况下,我们发现使用IP地址是有效的,而有一个可解析的域名却不行。RDD的解剖这是我们快速深入的地方。让我们开始讨论RDD的基础是什么。RDD有几个主要组件:依赖关系图这将详细说明在成功执行当前RDD之前必须计算哪些RDD。对于来自任何地方的RDD来说这可能是空的(比如sc.砂锅)或者一长串的操作和依赖关系(比如rdd.map.filter.随机播放.join.map).可以使用toDebugString查看任何RDD的图形scala>打印(sc.砂锅("test","tab").toDebugString)(1) 卡桑德拉德[3]在RDD卡桑德拉德。斯卡拉:48scala>打印(sc.并行化(1到10).map(*2).map(*2).map(*2).toDebugString(6) MappedRDD[7]在地图上的位置:60|MappedRDD[6]在地图上的位置:60|MappedRDD[5]在地图上的位置:60|并行集合RDD[4]at parallelize at:60隔墙描述RDD如何分区以及描述每个分区属性的关联元数据。每个分区都应该被认为是由RDD表示的一个离散的数据块。计算方法compute方法获取一段分区元数据(和任务上下文),并对该分区执行某种操作,返回迭代器。这是在RDD上调用操作时将执行的lazy方法。例如,在CassandraRDD中,该方法读取每个分区的元数据以获取Cassandra令牌范围,并返回一个迭代器,该迭代器从该范围生成C*数据。另一方面,MapRDD使用分区从以前的RDD检索迭代器,然后将给定的函数应用于该迭代器。(有关更多信息,请参阅视频Cassandra连接器如何读取数据。)首选定位法一种方法,它描述计算特定分区的首选位置。这个位置是由RDD定义的,但大多数RDD类型将其委托给链中的前一个RDD。在所有情况下,如果由于计算的分区已经存在而对分区进行了检查,那么这将被忽略。在cassandradd中,此方法使用来自自定义分区类的信息来查看哪个节点实际包含分区中指定的范围。请注意,这是一个"首选"而不是"保证"的位置。分区是否流式传输到另一个节点或本地计算取决于火花。地点。等等参数。此参数可以设置为0,以强制只在本地节点上计算所有分区。在RDD上执行操作时,将分析依赖树并将其分离为独立的子树。每个独立子树都成为一个阶段。所有阶段都会被处理,直到结果可以提供给用户。缩小依赖关系图在Spark中执行的许多操作只需要以前的RDD中的一个分区(例如:map、flatMap、keyBy)。因为在从这些转换中的一个生成的RDD中计算一个新的分区只需要一个以前的分区,所以我们可以快速而适当地构建它们。在Spark中,这些是最有效、最可靠的操作。因为每个新分区只依赖于一个过去的分区,所以可以独立重试,不需要网络操作。 另一方面,有些转换需要在集群中移动数据,因为它们需要所有先前RDD分区的知识才能工作。诸如shuffles、groupBy、join和sort等转换都需要一个shuffle,因此依赖于以前所有的RDD分区来完成它们的工作。您应该尝试将这些转换保持在最小值,并将它们尽可能向下推(在您正在进行的任何筛选之后)。缓存这些昂贵操作的结果也是一个好主意,这样对它的任何进一步引用都不需要重新计算整个依赖树。RDD操作链中的操作应该在哪里安置