云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

服务器_阿里巴巴云服务器_促销

小七 141 0

使用apachespark进行可伸缩的近实时S3访问日志分析™ 三角洲湖

最初的博客来自Zalando的高级数据工程师Viacheslav Inozemtsev,经许可复制。介绍许多组织使用aws3作为其数据的主要存储基础结构。此外,通过使用apachespark™ 在Databricks上,他们经常对数据进行转换,并将优化后的结果保存回S3进行进一步分析。当数据的大小和处理量达到一定的规模时,常常需要观察数据访问模式。常见的问题包括(但不限于):哪些数据集使用得最多?访问新数据和过去数据的比率是多少?在不影响用户性能的情况下,将数据集移动到更便宜的存储类的速度有多快?等。在Zalando,自从数据和计算在过去几年成为我们的商品以来,我们一直面临这个问题。我们的200多个工程团队几乎都定期执行分析、报告或机器学习,这意味着他们都从中央数据湖读取数据。使这些数据具有可观测性的主要动机是通过删除未使用的数据和减少生成这些数据的管道的资源使用来降低存储和处理的成本。另一个驱动因素是了解我们的工程团队是否需要查询历史数据,或者他们是否只对数据的最新状态感兴趣。为了回答这些类型的问题,S3提供了一个有用的特性——S3服务器访问日志。启用后,它会不断地转储观察到的bucket中的每个读写访问的日志。几乎立即出现的问题,尤其是在更高的范围内,这些日志是以相对较小的文本文件的形式出现的,其格式类似于apacheweb服务器的日志。为了查询这些日志,我们利用了apachespark的功能™ 在数据块上结构化流媒体,并构建了一个构建Delta-Lake表的流管道。这些表(对于每个观察到的bucket)包含结构良好的S3访问日志数据,这些数据是分区的,可以根据需要进行排序,因此,可以对公司数据的访问模式进行扩展和高效的分析。这使我们能够回答前面提到的问题和更多问题。在这篇博客文章中,我们将描述我们在Zalando中设计的生产架构,并详细展示如何自己部署这样一个管道。解决方案在开始之前,让我们做两个限定条件。第一个注意是关于为什么我们选择三角洲湖,而不是平原拼花或任何其他形式。正如您将看到的,为了解决所描述的问题,我们将使用Spark结构化流媒体创建一个连续的应用程序。在这种情况下,三角洲湖的特性将给我们带来以下好处:ACID事务:如果写入操作仍在进行或失败,则表的使用者没有损坏/不一致的读取操作,在S3上留下部分结果。更多信息也可以在DivingDeltaLake:解包事务日志中找到。模式强制:元数据由表控制;如果Spark作业的代码中存在错误,或者日志的格式发生了更改,则不可能破坏模式。更多信息可在《潜入三角洲湖:模式实施与进化》一书中找到。模式演化:另一方面,如果日志格式发生了变化,我们可以有目的地通过添加新字段来扩展模式。更多信息可在《潜入三角洲湖:模式实施与进化》一书中找到。开放格式:普通拼花格式的所有优点都适用于读者,例如谓词下推、列投影等。统一的批处理和流式源汇:有机会将下游的Spark结构化流作业链接起来,以生成基于新内容的聚合第二点是关于数据湖的客户正在读取的数据集。在大多数情况下,上述数据集包括两类:1)来自BI数据库的数据仓库表的快照;2)来自公司中央事件总线的连续事件流。这意味着首先有两种类型的数据写入模式-分别是每天一次的完整快照和连续附加流。在这两种情况下,我们都假设最后一天生成的数据最常被消耗。对于快照,我们还知道当前快照和过去版本之间很少进行比较,例如一年前的快照。我们知道当必须处理某个事件数据流的整个月甚至一年的历史数据时的用例。这给了我们一个寻找什么的想法,这就是所描述的管道应该帮助我们证明或反驳我们的假设。现在让我们来深入了解这个管道实现的技术细节。我们在当前阶段拥有的唯一实体是s3bucket。我们的目标是分析这个bucket的读写访问中出现了什么模式。为了让您了解我们将要展示的内容,在下面的图表中,您可以看到代表管道最终状态的最终体系结构。它描述的流程如下:AWS持续监控S3 bucket数据桶它将原始文本日志写入目标S3 bucket raw logs bucket对于每个创建的对象,都会向SQS queue new log objects队列发送一个事件通知每隔一个小时,数据库就会启动一次Spark作业Spark job从队列读取所有新消息Spark job从raw logs bucket读取所有对象(在来自队列的消息中描述)Spark job以append模式将新数据写入Delta logs bucket S3 bucket中的Delta Lake表(也可以选择执行OPTIMIZE和VACUUM,或者以自动优化模式运行)可以查询这个Delta-Lake表来分析访问模式 管理设置首先,我们将执行配置S3服务器访问日志记录和创建SQS队列的管理设置。配置S3服务器访问日志记录首先,您需要为数据桶配置S3服务器访问日志记录。要存储原始日志,首先需要创建一个额外的bucket,我们称之为raw logs bucket。然后可以通过UI或者API来配置日志,假设我们将目标前缀指定为data bucket logs/,这样我们就可以使用这个bucket来处理多个数据桶的S3访问日志。完成后–一旦有人对数据存储桶执行请求,原始日志就会开始出现在原始日志存储桶中。包含日志的对象的数量和大小将取决于请求的强度。如下表所示,我们在三个不同的桶中体验了三种不同的模式。您可以看到,数据的速度可能会非常不同,这意味着您在处理这些不同的数据源时必须考虑到这一点。创建SQS队列现在,当创建日志时,您可以开始考虑如何使用Spark读取日志以生成所需的Delta-Lake表。因为S3日志是以append-only模式编写的—只创建新对象,而没有对象被修改或删除—这是利用Databricks创建的S3-SQS Spark reader的完美案例。要使用它,首先需要创建一个SQS队列。我们建议将邮件保留期设置为7天,并将默认可见性超时设置为5分钟。从我们的经验来看,这些都是很好的缺省值,也与sparks3-SQS阅读器的默认值相匹配。让我们用名称new log objects queue引用队列。现在需要配置队列的策略,以允许从原始日志存储桶向队列发送消息。要实现这一点,您可以在UI中队列的Permissions选项卡中直接编辑它,或者通过API进行编辑。声明应该是这样的:{"Effect":"允许","委托人":"*","操作":SQS:发送消息","资源":arn:aws公司:sqs:{REGION}:{MAIN_ACCOUNT_ID}:新日志对象队列","条件":{"ArnEquals":{"aws:SourceArn公司": "arn:aws公司:s3:::原始日志存储桶"}}}配置S3事件通知现在,您已经准备好连接raw logs bucket和new log objects queue,这样对于每个新对象都有一条消息发送到队列。为此,您可以在UI中或通过API配置S3事件通知。JSON的配置如下所示:{"队列配置":[{"Id":"原始日志","QueueArn":arn:aws公司:sqs:{REGION}:{MAIN_ACCOUNT_ID}:新日志对象队列","事件":s3:ObjectCreated:*"]}]}操作设置在本节中,我们将执行必要的集群配置,包括创建IAM角色并准备集群配置。创建IAM角色为了能够运行Spark作业,您需要创建两个IAM角色–一个用于作业(集群角色),另一个用于访问S3(假定角色)。您需要另外假设一个单独的S3角色的原因是集群及其集群角色位于Databricks EC2实例和角色的专用AWS帐户中,而raw logs bucket位于原始源bucket所在的AWS帐户中。因为每个日志对象都是由Amazon角色编写的,这意味着集群角色无权根据日志对象的ACL读取任何日志。您可以通过使用带有AssumeRole策略的IAM角色跨帐户安全访问S3存储桶来了解更多信息。集群角色,这里称为集群角色,应该在专门用于数据链的AWS帐户中创建,并且应该具有以下两个策略:{"Version":"2012-10-17","声明":[{"行动":["sqs:接收消息","sqs:删除消息","sqs:GetQueueAttributes"],"资源":arn:aws公司:sqs:{REGION}:{DATABRICKS_ACCOUNT_ID}:新日志对象队列"],"Effect":"允许"}]}和{