云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

对象存储_excel云服务器_新注册优惠

小七 141 0

AWS数据管道与数据块的集成:用apachespark构建ETL管道

这是一系列关于将数据块与常用软件包集成的博客之一。请参阅末尾的"下一步是什么"部分,阅读本系列中的其他内容,其中包括如何使用AWS Lambda、Kinesis、flow等等。Databricks是一个在云中完全管理的apachespark数据平台,旨在为数据工程师、数据科学家或任何有兴趣使用Spark进行数据处理的人提供方便的实时数据探索和部署生产作业。AWS数据管道是一个web服务,它帮助以指定的间隔在不同的AWS计算和存储服务之间可靠地处理和移动数据。AWS数据管道帮助用户轻松创建容错、可重复和高可用性的复杂数据处理工作负载。Databricks本机部署到我们用户的AWS VPC中,并且与AWS生态系统中的每个工具兼容。在这个博客中,我将演示如何使用Databricks和AWS数据管道构建ETL管道。如何使用数据管道和数据块Databricks restapi支持对Databricks的编程访问,而不是通过webui。它可以自动创建和运行作业,将工作流产品化,等等。它还允许我们通过根据其他AWS服务中的事件触发一个动作来集成数据管道和数据块。使用AWS数据管道,您可以通过定义以下内容来创建管道:包含您的数据的"数据源"。为了确保在执行活动之前数据可用,AWS数据管道允许您选择性地创建称为前提条件的数据可用性检查。这些检查将反复尝试验证数据可用性,并将阻止任何依赖活动的执行,直到前提条件成功为止。例如启动业务逻辑或"数据"活动。可以使用AWS数据管道对象ShellCommandActivity调用Linux curl命令来触发对Databricks的restapi调用。业务逻辑执行的"时间表"。在过去调度开始时间的情况下,数据管道会反作用于任务。它还允许您通过支持资源和相关活动的不同计划周期来最大限度地提高资源的效率。例如,如果调度周期允许,可以重用相同的资源。下面是一个使用Databricks设置数据管道以定期处理日志文件的示例。AWS数据管道用于编排此管道,方法是检测每日文件何时准备好进行处理,并为检测每日作业的输出并发送最终电子邮件通知设置"前提条件"。日志处理示例设置数据管道:图1:ETL自动化:1)数据从Web服务器InputDataNode进入S3,2)触发事件并通过ShellCommandActivity调用Databricks 3)Databricks处理日志文件并写出Parquet数据,OutputDataNode,4)作为上一步的结果,发送一次SNS通知。该管道有几个步骤:1输入前提条件检查输入数据是否存在:s3:///input/2输入数据节点将输入配置为下一步的源,下一步是调用restapi的shell脚本。三。调用Databricks REST API调用ShellCommandActivity运算符使用文件输入和输出参数调用Databricks REST API(为了说明本博客中的要点,我们使用下面的命令;对于您的工作负载,有许多方法可以维护安全性):curl-xpost-u:https://。cloud.databricks.com/api/2.0/jobs/run now-d'{"job_id":,"notebook_params":{"inputPath":"s3a://:@/input/sample_logs/part-00001","outputPath":"s3a://:@/output/sample_log_parquet"}'图2:Databricks中被调用的作业的屏幕截图。请注意,此作业不是在Databricks中调度的,而是由数据管道触发的。4Databricks操作Databricks操作包括读取输入日志文件、创建模式并将其转换为Parquet:val inputPath=getArgument("inputPath","default")case类ApacheAccessLog(ipAddress:String,clientId:String,userId:String,日期时间:String,方法:字符串,终结点:字符串,协议:字符串,responseCode:Int,contentSize:Long){}val Pattern=""^(\S+(\S+)(\S+)\[([\w:/]+\S[+\-]\d{4})\]"(\S+(\S+)(\d{3})(\d+)"。rdef parseLogLine(log:String):ApacheAccessLog={值res=Pattern.findFirstMatchIn模式(日志)如果(res.isEmpty公司) {抛出新的RuntimeException("无法分析日志行:"+log)}值m=资源获取ApacheAccessLog(m组(1)、m组(2)、m组(3)、m组(4),m、 组(5),m组(6),m组(7),m组(8),toInt,m组(9).toLong)}val访问日志=(sc.TEXT文件(输入路径).map(parseLogLine)).toDF().cache()//"accessLogs"数据帧的附加处理val outPath=getArgument("outputPath","default")accessLogs.write.format("parquet").mode("overwrite").save(outPath)5输出前提条件检查输出数据是否存在:s3:///output/sample_log_拼花地板6发送亚马逊SNS警报最后一步,当作业成功时,发出Amazon SNS警报。您可以通过多种方式订阅SNS消息,例如电子邮件或文本通知推送。下一步是什么我们希望这个简单的例子展示如何使用Amazon数据管道和Databricks API来解决数据处理问题。数据管道与AWS中的各种存储层集成,使用ShellCommandActivity可以与Databricks REST API集成,参数参数可以动态地传递到Databricks的笔记本或库中。要试用Databricks,请注册免费试用或与我们联系。阅读本系列中的其他博客,了解如何将数据块与现有体系结构集成:集成Apache Airflow和Databricks利用AWS-Kinesis、RDS和Databricks优化油气资产使用awslambda和数据块实现ETL自动化和ML模型服务工具书类AWS数据管道AWS数据管道开发人员指南AWS数据管道前提条件AWS数据管道外壳命令活动AWS数据管道Amazon SNS通知亚马逊简单通知(SNS)Databricks REST API免费试用Databricks。今天就开始吧