关键概念,安装和DAG实际示例
> Photo by Şahin Yeşilyaprak on Unsplash
Airflow是用于自动化和安排任务和工作流程的工具。 如果您想以数据科学家,数据分析师或数据工程师的身份高效工作,那么拥有一个可以自动执行要定期重复的流程的工具至关重要。 从提取,转换和加载常规分析报告的数据到自动重新训练机器学习模型,这可以是任何事情。
Airflow使您可以轻松地自动化主要由Python和SQL编写的简单到复杂的流程,并具有丰富的Web UI来可视化,监视和修复可能出现的任何问题。
下面的文章是对该工具的完整介绍。 我已经包含了从虚拟环境中安装到以简单易懂的步骤运行第一个dag的所有内容。
我将本教程分为6个部分,以使其易于理解,以便您可以跳过已经熟悉的部分。 包括以下步骤:
· 基本气流概念。
· 如何在虚拟环境中设置Airflow安装。
· 运行Airflow Web UI和调度程序。
· 常见的CLI命令列表。
· Web UI导览。
· 创建一个真实的示例DAG。
在讨论Airflow的安装和使用之前,我将简要介绍该工具至关重要的几个概念。
该工具的核心是DAG(有向无环图)的概念。 DAG是您要在工作流中运行的一系列任务。 这可能包括通过SQL查询提取数据,使用Python执行一些计算,然后将转换后的数据加载到新表中。 在Airflow中,每个步骤都将作为DAG中的单独任务编写。
通过Airflow,您还可以指定任务之间的关系,任何依赖关系(例如,在运行任务之前已将数据加载到表中)以及应按顺序运行任务。
DAG用Python编写,并另存为.py文件。 该工具广泛使用DAG_ID来协调DAG的运行。
我们具体说明了DAG应何时通过execute_date自动运行。 DAG按照指定的时间表运行(由CRON表达式定义),该时间表可以是每天,每周,每分钟或几乎任何其他时间间隔
Operator将要在每个任务中执行的操作封装在DAG中。 Airflow有大量的内置操作员,它们可以执行特定任务,其中一些特定于平台。 此外,可以创建自己的自定义运算符。
我将为您提供在隔离的Pipenv环境中进行Airflow设置的个人设置。 如果您使用其他虚拟环境工具,则步骤可能会有所不同。 此设置的大部分灵感来自于出色的Stackoverflow线程。
对您的Airflow项目使用版本控制是一个好主意,因此第一步是在Github上创建一个存储库。 我叫我airflow_sandbox。 使用git clone" git web url"创建到本地环境的存储库克隆后。
从终端导航到目录,例如 cd / path / to / my_airflow_directory。
进入正确的目录后,我们将安装pipenv环境以及特定版本的Python,Airflow本身和Flask,这是运行Airflow所必需的依赖项。 为了使一切正常工作,最好为所有安装指定特定版本。
pipenv install --python=3.7 Flask==1.0.3 Apache-airflow==1.10.3
气流需要在本地系统上运行一个称为AIRFLOW_HOME的位置。 如果未指定,则默认为您的路线目录。 我更喜欢通过在.env文件中指定它来在我正在工作的项目目录的路径中设置Airflow。 为此,只需运行以下命令。
echo "AIRFLOW_HOME=${PWD}/airflow" >> .env
接下来,我们初始化pipenv环境。
pipenv shell
气流需要运行数据库后端。 默认设置为此使用SQLite数据库,这对于学习和实验来说是很好的选择。 如果您想建立自己的数据库后端,那么气流文档会提供很好的指导。 初始化数据库类型。
airflow initdb
最后,我们创建一个目录来存储我们的dag。
mkdir -p ${AIRFLOW_HOME}/dags/
就是说初始基本设置完成了。 您现在应该具有一个如下所示的项目结构。
Airflow具有出色的Web UI,您可以在其中查看和监视您的问题。 要启动Web服务器以查看UI,只需运行以下CLI命令。 默认情况下,Airflow将使用端口8080,因为我已经使用该端口运行我指定8081的其他端口。
airflow webserver -p 8081
我们还需要启动调度程序。
airflow scheduler
现在,如果我们导航到http:// localhost:8081 / admin /?showPaused = True。 我们将看到以下屏幕。
Airflow有一些显示在用户界面中的示例dag。 一旦开始创建自己的窗口,您可以通过单击屏幕底部的"隐藏暂停的DAG"来隐藏它们。
让我们使用这些示例dag来浏览一些常见的Airflow CLI命令。
让我们从教程dag中运行sleep任务。
airflow run tutorial sleep 2020-05-31
我们可以在教程DAG中列出任务。
bash-3.2$ airflow list_tasks tutorial
暂停此DAG。
airflow pause tutorial
取消暂停教程。
airflow unpause tutorial
回填(在过去的日期执行任务)。 指定dag_id(教程),开始日期(-s)和结束日期(-e)。
airflow backfill tutorial -s 2020-05-28 -e 2020-05-30
有关CLI命令的完整列表,请参见文档中的此页面。
我们可以从Web UI监视,检查和运行任务。 如果返回到Web服务器,我们可以看到我们在教程DAG上运行的CLI命令的效果。 为了便于查看,我隐藏了暂停的dag。
我们可以通过多种方法来检查DAGS的运行情况。
如果我们选择树状视图。
我们可以轻松查看哪些任务已运行,正在运行或已失败。
我们还可以通过单击小方块从此处运行,清除或标记特定任务。
如果单击"渲染"按钮,我们可以查看已运行的代码或命令。
通过"代码"视图,我们可以查看组成DAG的代码。
Graph View是一种很好的方式可视化任务的排序或相关方式。
Web UI中的另一个重要区域是Admin。 在这里,您可以定义与其他平台(如数据库)的连接,并定义可重用的变量。
我将在此处尝试提供一个接近实际的DAG示例,以说明至少一种使用Airflow的方法,并介绍随之而来的一些复杂性。
我将编写一个Airflow DAG,它首先检查BigQuery公共数据集中感兴趣日期的数据是否存在,然后按日程将数据加载到我自己的私有项目中的表中。
BigQuery有一个免费的使用层,该层允许您每月查询1TB数据,因此,如果您想自己尝试一下,则可以以零成本进行。
为了同时使用Airflow和BigQuery,我们需要首先完成一些附加的设置步骤。
为了能够通过Airflow查询和加载BigQuery中的数据,我们需要首先授予Airflow所需的安全权限。
为此,您需要在google Cloud Platform上创建一个服务帐户。 这有点像创建一个有权访问您的帐户的用户,但其目的是允许其他平台访问。
首先,从Google Cloud Console导航到服务帐户。 然后单击创建服务帐户按钮。
接下来填写显示的表格。
在下一页上,您需要选择要授予的访问级别。 我已选择所有资源的编辑器,因为这是我的个人帐户,并且此处没有存储任何敏感数据。 如果我更担心潜在的安全问题,那么我将授予更多的细化权限。
接下来,您需要创建一个私钥,可以通过选择创建密钥来完成。 选择JSON,因为这是Airflow所需要的。 私钥将被下载到您需要安全存储的本地系统。
现在,我们需要返回Airflow Web UI并使用此JSON文件的输出更新bigquery_default连接。 您还需要添加一个默认的项目ID,如下所示。
我们还需要在Google Pipenv环境中安装一些Google Cloud依赖项。 我已经安装了以下内容。
pipenv install google-cloud-storage httplib2 google-api-python-client google-cloud-bigquery pandas_gbq
以下是将执行上述步骤的DAG的代码。 应将其另存为.py文件在我们之前创建的dags目录中。
DAG的顶部是必需的进口。 Airflow提供了一系列运营商,可在Google Cloud Platform上执行大多数功能。 我已经导入了BigQueryOperator(用于运行查询和加载数据)和BigQueryCheckOperator(用于检查特定日期的数据是否存在)。
在DAG的下一部分中,我们定义dag_args,然后创建DAG,该DAG提供诸如dag_id,start_date和应该多长时间运行一次任务等信息。 Airflow使用CRON表达式定义时间表,有关这些表达式的更多信息,请访问此页面。
然后,我们将每个步骤定义为一项任务,我将其定义为变量t1和t2。 这些任务均在工作流程中执行特定步骤。 这些命令的运行顺序位于DAG的最底部。
现在,我们可以转到Web UI并运行DAG。
如果我们转到BigQuery控制台,我们还将看到Airflow已创建并加载了数据的表。
本文旨在作为一个完整的介绍,让您开始使用Airflow创建第一个DAG并开始运行。 有关更详细的使用指南,请在此处找到Airflow文档。
可以在此Github存储库中找到本文详细介绍的完整项目的链接。
谢谢阅读!
如果您想加入,我会每月发送一次通讯,请通过此链接注册。 期待成为您学习之旅的一部分!
(本文翻译自Rebecca Vickery的文章《A Complete Introduction to Apache Airflow》,参考:https://towardsdatascience.com/a-complete-introduction-to-apache-airflow-b7e238a33df)