Airflow 入门及使用

什么是 Airflow?

Airflow 是一个使用 python 语言编写的 data pipeline 调度和监控工作流的平台。 Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具, 不需要知道业务数据的具体内容,设置任务的依赖关系即可实现任务调度。

这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres 等数据源之间交互的能力,并且提供了钩子(hook)使其拥有很好地扩展性。 除了使用命令行,该工具还提供了一个 WebUI 可以可视化的查看依赖关系、监控进度、触发任务等。

Airflow 的架构

在一个可扩展的生产环境中,Airflow 含有以下组件:

  • 元数据库:这个数据库存储有关任务状态的信息。
  • 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
  • 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。 其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
  • Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。
                                       

Airflow 解决哪些问题

通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。包括但不限于:

  • 时间依赖:任务需要等待某一个时间点触发。
  • 外部系统依赖:任务依赖外部系统需要调用接口去访问。
  • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响。 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行。

crontab 可以很好地处理定时执行任务的需求,但仅能管理时间上的依赖。

Airflow 是一种 WMS,即:它将任务以及它们的依赖看作代码,按照那些计划规范任务执行,并在实际工作进程之间分发需执行的任务。 Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI,并允许用户手动管理任务的执行和状态。

Airflow 中的工作流是具有方向性依赖的任务集合。 具体说就是 Airflow 的核心概念 DAG(有向无环图) —— 来表现工作流。 dag中的每个节点都是一个任务,dag中的边表示的是任务之间的依赖(强制为有向无环,因此不会出现循环依赖,从而导致无限执行循环)。

Airflow 在 ETL 上的实践

ETL,是英文 Extract,Transform,Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。 ETL 一词较常用在数据仓库,Airflow 在解决 ETL 任务各种依赖问题上的能力恰恰是我们所需要的。

在现阶段的实践中,我们使用 Airflow 来同步各个数据源数据到数仓,同时定时执行一些批处理任务及带有数据依赖、资源依赖关系的计算脚本。

本文立意于科普介绍,故在后面的用例中只介绍了 BashOperatorPythonOperator 这俩个最为易用且在我们日常使用中最为常见的 operator。

Airflow 同时也具有不错的集群扩展能力,可使用 CeleryExecuter 以及多个 Pool 来提高任务并发度。

Airflow 在 CeleryExecuter 下可以使用不同的用户启动 Worker,不同的 Worker 监听不同的 Queue,这样可以解决用户权限依赖问题。 Worker 也可以启动在多个不同的机器上,解决机器依赖的问题。

Airflow 可以为任意一个 Task 指定一个抽象的 Pool,每个 Pool 可以指定一个 Slot 数。 每当一个 Task 启动时,就占用一个 Slot,当 Slot 数占满时,其余的任务就处于等待状态。这样就解决了资源依赖问题。

Airflow 安装及初始化

假设:你已经安装好了 python 及配置好了其包管理工具 pip

pip install apache-airflow

# 初始化数据库
airflow initdb

# 上面的命令默认在家目录下创建 airflow 文件夹和相关配置文件
# 也可以使用以下命令来指定目录
export AIRFLOW_HOME={yourpath}/airflow

# 配置数据库
# vim airflow/airflow.cfg
# 修改 sql_alchemy_conn

# 守护进程运行webserver, 默认端口为8080,也可以通过`-p`来指定端口
airflow webserver -D  

# 守护进程运行调度器     
airflow scheduler -D

定义第一个DAG

在 AIRFLOW_HOME 目录下新建 dags 文件夹,后面的所有 dag 文件都要存储在这个目录。

新建 demo.py,语句含义见注释

from datetime import datetime, timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator


def default_options():
    default_args = {
        \'owner\': \'airflow\',  # 拥有者名称
        \'start_date\': days_ago(1),  # 第一次开始执行的时间,为 UTC 时间(注意不要设置为当前时间)
        \'retries\': 1,  # 失败重试次数
        \'retry_delay\': timedelta(seconds=5)  # 失败重试间隔
    }
    return default_args


# 定义DAG
def test1(dag):
    t = "echo \'hallo world\'"
    # operator 支持多种类型, 这里使用 BashOperator
    task = BashOperator(
        task_id=\'test1\',  # task_id
        bash_command=t,  # 指定要执行的命令
        dag=dag  # 指定归属的dag
    )
    return task


def hello_world_1():
    current_time = str(datetime.today())
    print(\'hello world at {}\'.format(current_time))


def test2(dag):
    # PythonOperator
    task = PythonOperator(
        task_id=\'test2\',
        python_callable=hello_world_1,  # 指定要执行的函数
        dag=dag)
    return task


def test3(dag):
    # DummyOperator
    task = DummyOperator(
        task_id=\'test3\',
        dag=dag)
    return task


with DAG(
        \'test_task\',  # dag_id
        default_args=default_options(),  # 指定默认参数
        schedule_interval="@once"  # 执行周期
) as d:
    task1 = test1(d)
    task2 = test2(d)
    task3 = test3(d)
    task1 >> task2 >> task3  # 指定执行顺序

写完后执行 python $AIRFLOW_HOME/dags/demo.py 检查是否有错误,如果命令行没有报错,就表示没问题。

Web UI

打开 localhost:8080

主视图

Airflow 的 WebUI 是其任务调度可视化的体现,可以再这个 WebUI 上监控几乎所有任务调度运行的实时及历史数据。 一些命令如 TriggerClear 均可以在 WebUI 上完成;一些全局参数也可以在主页面导航栏 Admin 下配置。

点击 dag_name, 进入任务预览:

任务图视

 

 

 
任务树视图

 

 

其他常用命令

# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10

# 查看生效的 dags
airflow list_dags -sd $AIRFLOW_HOME/dags

# 开始运行任务(同web界面点trigger按钮)
airflow trigger_dag test_task    

# 暂停任务
airflow pause dag_id      

# 取消暂停,等同于在web管理界面打开off按钮
airflow unpause dag_id     

# 查看task列表
airflow list_tasks dag_id  查看task列表

# 清空任务状态
airflow clear dag_id       

# 运行task
airflow run dag_id task_id execution_date

Airflow 核心原理分析

概念及发展

  • JOB:最上层的工作。分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 创建,BackfillJob 由 Backfill 创建,LocalTaskJob 由前面两种 Job 创建。
  • DAG:有向无环图,用来表示工作流。
  • DAG Run:工作流实例,表示某个工作流的一次运行(状态)。
  • Task:任务,工作流的基本组成部分。
  • TaskInstance:任务实例,表示某个任务的一次运行(状态)。

在早期版本 Airflow 中,DAG 执行主要有两种完全独立的执行途径:SchedulerJob 和 BackfillJob。在一次较大的重构中增加了 DagRun 方式,以跟踪 DAG 的执行状态。

 

 

 

结构关

DagRun 执行流程描述

DagRuns 表示某个时间点 DAG 的状态(也称为 DagInstances)。 要运行 DAG 或管理 DAG 的执行,必须首先创建一个 DagRun 实例。 但是仅创建 DagRun 不足以实际运行 DAG(就像创建 TaskInstance 与实际运行任务并不一样)。 因此需要一种机制来实现上述流程

刷新 dags

  • 收集新的 DagRuns
  • 执行 DagRuns(包括更新 DagRuns 的状态为成功或失败)
  • 唤醒 executor/心跳检查

Scheduler 的调度逻辑

调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 job 表中,并维护状态和心跳,预期能够正常结束,将结束时间也更新到表中。 但是实际上往往因为异常中断,导致结束时间为空。不管是如何进行的退出,SchedulerJob 退出时会关闭所有子进程。

这里主要介绍下 Scheduler 的调度逻辑:

  • 遍历 dags 路径下的所有 dag 文件, 启动一定数量的进程(进程池),并且给每个进程指派一个 dag 文件。 每个 DagFileProcessor 解析分配给它的dag文件,并根据解析结果在 DB中创建 DagRuns 和 TaskInstance。
  • 在 scheduler_loop 中,检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 executor 队列,将新排列的 TaskInstance 状态更新为QUEUED状态。
  • 每个可用的 executor 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录更新为SCHEDULED
  • 当一个 TaskInstance 完成运行,关联的 executor 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
  • 一旦所有的dag处理完毕后,就会进行下一轮循环处理。这里还有一个细节就是上一轮的某个dag的处理时间可能很长,导致到下一轮处理的时候这个dag还没有处理完成。 Airflow 的处理逻辑是在这一轮不为这个dag创建进程,这样就不会阻塞进程去处理其余dag。

Scheduler 模块代码结构

DagFileProcessor 在子进程中解析 DAG 定义文件。对于发现的 DAG,检查 DagRun 和 TaskInstance 的状态。如果有 TaskInstance 可以运行,将状态标记为 SCHEDULED。 为每个 dag 文件分配一个进程,同时在 DagFileProcessorManager 中保存有 dag 和 processor的映射表。在 dag 没有被任何 processor 处理的时候,才会给它创建新的处理进程。

DagFileProcessorManager 控制 DagFileProcessors 如何启动。它追踪哪些文件应该被处理并且确保一旦有一个 DagFileProcessor 完成解析,下一个 dag 文件应该得到处理。并且控制 DagFileProcessors 的数量。

SchedulerJob 通过 agent 获取 manager 的 DAG 定义文件解析结果,并且将 SCHEDULED 状态的 TaskInstance 发送给executor执行。

DagFileProcessorAgent 作为一个采集代理,scheduler可以借助 agent 获取 manager 获取到的 DAG 解析结果,并且可以控制 manager 的行为。

核心类分析

1.Dag

method

  • following_schedule() 计算当前dag的下一次调度时间
  • previous_schedule() 计算当前dag的上一次调度时间
  • get_dagrun() 返回给定执行日期的dagrun(如果存在)
  • create_dagrun() 创建一个包括与此dag相关任务的dagrun
  • ckear() 清除指定日期范围内与当前dag相关的一组任务实例
  • run() 实例化为 BackfillJob 同时调用job.run()

2.DagRun

model

ID_PREFIX = \'scheduled__\'
ID_FORMAT_PREFIX = ID_PREFIX + \'{0}\'

id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(UtcDateTime, default=timezone.utcnow)
start_date = Column(UtcDateTime, default=timezone.utcnow)
end_date = Column(UtcDateTime)
_state = Column(\'state\', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=True)
conf = Column(PickleType)

method

  • get_dag() 返回与当前 DagRun 相关的 Dag
  • get_task_instances() 返回与当前 DagRun 的所有 TaskInstances
  • update_state() 根据 TaskInstances 的状态确定 DagRun 的总体状态
  • get_latest_runs() 返回每个 Dag 的最新一次 DagRun

3.TaskInstance

model

__tablename__ = "task_instance"

task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
duration = Column(Float)
state = Column(String(20))
_try_number = Column(\'try_number\', Integer, default=0)
max_tries = Column(Integer)
hostname = Column(String(1000))
unixname = Column(String(1000))
job_id = Column(Integer)
pool = Column(String(50), nullable=False)
queue = Column(String(256))
priority_weight = Column(Integer)
operator = Column(String(1000))
queued_dttm = Column(UtcDateTime)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))

method

  • get_dagrun() 返回当前 TaskInstance 的 DagRun
  • run() TaskInstance run
  • get_template_context() 通过 Jinja2 模板获取上下文
  • xcom_push() 创建一个 XCom 可用于task发送参数
  • xcom_pull() 创建一个 XCom 可用于task接收参数

4.SchedulerJob

def _execute(self):
    """
    The actual scheduler loop. The main steps in the loop are:
        #. Harvest DAG parsing results through DagFileProcessorAgent
        #. Find and queue executable tasks
            #. Change task instance state in DB
            #. Queue tasks in executor
        #. Heartbeat executor
            #. Execute queued tasks in executor ake_aware(execution_date,
                                                     self.task.dag.timezone)
    """
    self.processor_agent = DagFileProcessorAgent()  # 通过检查当前processor数量来控制进程个数
    self.executor.start()

    # Start after resetting orphaned tasks to avoid stressing out DB.
    self.processor_agent.start()  # 在解析dag文件时,只会对最近修改过的文件进行解析
    execute_start_time = timezone.utcnow()

    # For the execute duration, parse and schedule DAGs
    while (timezone.utcnow() - execute_start_time).total_seconds() < \
            self.run_duration or self.run_duration < 0:
        # Starting Loop...

        self.processor_agent.heartbeat()  # 控制 DagFileProcessor 解析 DAG 文件的速度

        # Harvesting DAG parsing results
        simple_dags = self.processor_agent.harvest_simple_dags()

        if len(simple_dags) > 0:
            self._execute_task_instances()
        ...

        # Call heartbeats
        self.executor.heartbeat()
        # heartbeat()中根据parallelism得出当前可用的slots数量,
        # 决定execute_async多少个task

        # Process events from the executor
        self._process_executor_events(simple_dag_bag)

        # Ran scheduling loop for all tasks done
        ...

    # Stop any processors
    self.processor_agent.terminate()

    # Verify that all files were processed, and if so, deactivate DAGs that
    # haven\'t been touched by the scheduler as they likely have been
    # deleted.
    ...

    self.executor.end()

method

  • create_dag_run() 根据调度周期检查是否需要为 DAG 创建新的 DagRun。如果已调度,则返回 DagRun,否则返回 None
  • process_file() 解析 DAG 定义文件
  • _execute_task_instances() 尝试执行调度器调度过的 TaskInstances TaskInstances in the executor.
  • reduce_in_chunks() 用来进行小的分批处理

总结

本文在第一部分着重介绍了 Airflow 的理念、使用场景及其一般架构。 提供了相对简单易懂的安装及操作命令,并附带了一个使用案例用来介绍代码如何编排以及 WebUI 的使用。

在第二部分开篇介绍了 Airflow 任务创建、调度和管理的一些基础概念,以及 Airflow 版本迭代的一些重要变化。 Airflow 目前还是处于快速开发中,当前版本有很多遗留问题,版本升级也不是向后兼容的,变动很大。

Scheduler 毫无疑问是整个 Airflow 的核心模块,逻辑结构复杂。 本文从 Scheduler 模块的主要逻辑入手,分析了控制循环和代码结构,重点分析了从 dag.py 代码文件到可调度执行的 TaskInstances 所经历的阶段; 以及介绍了并发控制的实现和性能优化。

最后结合源码介绍了 Airflow 核心类的模型定义和主要方法,以了解各个类所扮演的角色及其实现的功能。

 

参考 : https://zhuanlan.zhihu.com/p/90282578?utm_source=ZHShareTargetIDMore