云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

谷歌云_网站建设的要点_三重好礼

小七 141 0

将Apache Airflow与Databricks集成

这篇博文是我们一系列关于Databricks平台、基础设施管理、集成、工具、监控和供应的内部工程博客的一部分。今天,我们很高兴地宣布在apacheflow(一个流行的开源工作流调度器)中集成了本机Databricks。这篇博客文章说明了如何设置气流并使用它来触发Databricks作业。Databricks的统一分析平台(UAP)的一个非常流行的特性是能够将数据科学笔记本直接转换成可以定期运行的生产作业。虽然此功能将从探索性数据科学到生产数据工程的工作流程统一起来,但某些数据工程作业可能包含复杂的依赖项,这些依赖项很难在笔记本中捕获。为了支持这些复杂的用例,我们提供了restapi,这样基于笔记本和库的作业可以由外部系统触发。其中,客户最常用的调度程序之一是气流。我们很高兴与大家分享,我们还扩展了气流以支持开箱即用的数据块。气流基础知识Airflow是一个具有依赖关系管理的通用工作流调度器。除了能够安排周期性作业外,flow还允许您表达数据管道中不同阶段之间的明确依赖关系。每个ETL管道都表示为任务的有向无环图(DAG)(不要误认为是Spark自己的DAG调度器和任务)。依赖关系通过其边编码到DAG中-对于任何给定的边,只有上游任务成功完成时才调度下游任务。例如,在示例中,只有在任务A成功完成后,才会触发下面的DAG、任务B和任务C。当任务B和任务C都成功完成时,任务D将被触发。flow中的任务是"operator"类的实例,并作为小Python脚本实现。由于它们只是Python脚本,flow中的操作员可以执行许多任务:他们可以在成功之前轮询某些先决条件是否为真(也称为传感器)、直接执行ETL或触发外部系统(如数据块)。有关气流的更多信息,请查看其文档。气流中的本地数据块集成我们实现了一个叫做DatabricksSubmitRunOperator的气流操作符,使得气流和数据块之间的集成更加平滑。通过这个操作符,我们可以命中Databricks Runs Submit API端点,它可以在外部触发一次jar、python脚本或笔记本的运行。在发出提交运行的初始请求后,操作员将继续轮询运行结果。成功完成后,操作员将返回,允许下游任务运行。我们已经将DatabricksSubmitRunOperator贡献给了开源Airflow项目。但是,在气流1.9.0释放之前,集成不会被切割为释放分支。在此之前,要使用这个操作符,您可以安装Databricks的flow fork,它实际上是应用了DatabricksSubmitRunOperator补丁的flow版本1.8.1。pip安装--升级"git+git://github.com/databricks/incubator flow.git@1.8.1-db1\egg=阿帕奇气流[databricks]"气流与数据块教程在本教程中,我们将设置一个在本地计算机上运行的toy flow 1.8.1部署,还将部署一个示例DAG,它将触发在数据块中的运行。我们要做的第一件事是初始化sqlite数据库。Airflow将使用它来跟踪其他元数据。在生产气流部署中,您需要编辑配置以将气流指向MySQL或Postgres数据库,但对于我们的示例,我们只使用默认的sqlite数据库。要执行初始化运行:气流初始数据库SQLite数据库和气流部署的默认配置将在~/Airflow中初始化。在下一步中,我们将编写一个DAG,它运行两个具有一个线性依赖关系的Databricks作业。第一个Databricks作业将触发位于/Users的笔记本/flow@example.com/PrepareData,第二个将运行位于dbfs:/lib/etl-0.1.jar的jar。从一英里高的角度来看,脚本DAG基本上构造了两个DatabricksSubmitRunOperator任务,然后在末尾用set_dowstream方法设置依赖关系。代码的框架版本如下所示:notebook_task=数据库提交操作员(task_id='notebook_task',…)spark_jar_task=数据库提交操作员(task_id='spark_jar_task',…)笔记本_task.set_下游(spark_jar_任务)实际上,我们还需要填写一些其他细节来获得一个有效的DAG文件。第一步是设置一些默认参数,这些参数将应用于DAG中的每个任务。参数={"所有者":"气流",'电子邮件':['flow@example.com'],"取决于过去":错误,'开始日期':airflow.utils.日期.天前(2)}这里两个有趣的论点是取决于过去和开始日期。如果depends_-on-u-past为true,则表示除非任务的前一个实例成功完成,否则不应触发任务。start_date参数决定何时调度第一个任务实例。DAG脚本的下一部分实际上实例化了DAG。dag=dag(dag_id='example\u databricks_operator',默认的_args=args,计划时间间隔='@每日')在这个DAG中,我们给它一个惟一的ID,附加我们之前声明的默认参数,并给它一个每日计划。接下来,我们将指定运行任务的集群的规范。新建群集={"spark_版本":"2.1.0-db3-scala2.11",'节点类型'u id':'r3.xlarge',"aws_属性":{"可用性":"按需"},"工人人数":8人}此规范的架构与Runs Submit端点的新集群字段匹配。对于您的示例DAG,您可能希望减少worker的数量或将实例大小更改为更小的值。最后,我们将实例化DatabricksSubmitRunOperator并将其注册到DAG中。笔记本任务参数={"new_cluster":新_cluster,"笔记本任务":{'笔记本路径':'/Users/flow@example.com/PrepareData',},}#使用JSON参数初始化运算符的示例。notebook_task=数据库提交操作员(task_id='notebook_task',dag=dag,json=笔记本任务参数)在这段代码中,JSON参数接受与Runs Submit端点匹配的python字典。为了在这个任务的下游添加另一个任务,我们再次实例化DatabricksSubmitRunOperator,并在notebook_task operator实例上使用特殊的set_dowstream方法来注册依赖关系。#使用DatabricksSubmitRunOperator的命名参数的示例#初始化运算符。spark_jar_task=数据库提交操作员(task_id='spark_jar_task',dag=dag,new_cluster=新_cluster,spark_jar_任务={'主类别名称':'com.example.ProcessData'},图书馆=[{'jar':'dbfs:/lib/etl-0.1.jar'}])笔记本_task.set_下游(spark_jar_任务)此任务运行位于dbfs:/lib/etl-0.1.jar的jar。注意,在notebook_任务中,我们使用JSON参数指定submit run端点的完整规范,而在spark_jar_任务中,我们将submit run端点的顶级键平铺为DatabricksSubmitRunOperator的参数。虽然实例化操作符的两种方法是等效的,但后一种方法不允许使用任何新的顶级字段,如spark_python_task或spark_submit_task。有关DatabricksSubmitRunOperator的完整API的更多详细信息,请参阅此处的文档。现在我们有了DAG,要将其安装到Airflow中,请在~/Airflow中创建一个名为~/Airflow/dags的目录,然后将DAG复制到该目录中。在这一点上,气流应该能够拾取DAG。$气流列表[10:27:13][2017-07-06 10:27:23868]{uu init_uu.py:57}信息-使用executor sequential executor【2017-07-06 10:27:24238】{型号。py:168}信息-从/Users/andrew/flow/dags填充DagBag-------------------------------------------------------------------达吉斯-------------------------------------------------------------------示例_bash_运算符示例_branch_dop_operator_v3分支运算符示例示例_databricks_运算符我们还可以在webui中可视化DAG。要启动它,请运行airflow Web服务器并连接到本地主机:8080。点击"example_databricks_operator",您将看到您的DAG的许多可视化效果。示例如下:此时,细心的观察者可能还注意到,我们没有在DAG中的任何地方指定Databricks碎片的主机名、用户名和密码等信息。为此,我们使用flow的connection原语,它允许我们从DAG引用数据库中存储的凭证。默认情况下,所有DatabricksSubmitRunOperator将databricks_conn_id参数设置为"databricks_default",因此对于DAG,我们必须添加一个id为"databricks_default"的连接最简单的方法是通过webui。点击顶部的"管理",然后在下拉列表中点击"连接"将显示您当前的所有连接。对于我们的用例,我们将为"databricks_default"添加一个连接。最后的连接应该如下所示:现在我们已经为DAG准备好了一切,现在是时候测试每个任务了。为了完成我们要运行的notebook_任务,气流测试示例_databricks_operator notebook_task 2017-07-01,对于spark_jar_任务,我们将运行flow test example_databricks_operator spark_jar_task 2017-07-01。要按计划运行DAG,可以使用命令flow scheduler调用调度程序守护进程。如果一切顺利,在启动调度程序之后,您应该能够看到DAG的回填运行开始在webui中运行。下一步行动总之,这篇博客文章提供了一个简单的例子来设置与Databricks的气流集成。它展示了Databrick