airfkow的queue(队列)的使用

时间:2024-02-21 20:38:08

Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

LocalExecutor:多进程本地执行任务

CeleryExecutor:分布式调度,生产常用

DaskExecutor :动态任务调度,主要用于数据分析

在当前项目使用CeleryExecutor作为执行器。

celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,当前项目使用的是rabbitmq,系统整体结构如下所示:

其中:

turing为外部系统

GDags服务帮助拼接成dag

master节点webui管理dags、日志等信息

scheduler负责调度,只支持单节点

worker负责执行具体dag中的task, worker支持多节点

在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。

队列的使用前提就是选择CeleryExecutor执行器

以下如内容来自airflow官网

使用CeleryExecutor时,可以指定将任务发送到的Celery队列。队列是BaseOperator的属性,因此可以将任何任务分配给任何队列。在airflow.cfg的celery-> default_queue中定义了环境的默认队列。这定义了未指定任务时分配给其的队列,以及Airflow的worker在启动时侦听的队列。

worker可以听一个或多个任务队列。启动工作程序时(使用命令airflow worker),可以指定一组用逗号分隔的队列名称(例如airflow worker -q spark)。然后,此worker将仅拾取连接到指定队列的任务。

如果从资源的角度(例如非常轻巧的任务,其中一个worker可以毫无问题地执行数千个任务)或从环境的角度(您需要一个在Spark集群中运行的worker)来需要专门的worker,这可能很有用。本身,因为它需要非常具体的环境和安全权限)。http://airflow.apache.org/docs/stable/concepts.html#queues

 

 

airflow中的队列严格来说不叫Queues,叫"lebal"更为合适。在operator中,可以设置queue参数如queue=spark,然后在启动worker时:airflow worker -q spark,那么该worker只会执行spark任务。相当于节点标签。

实践测试

测试地址 http://118.178.129.100:8888/admin/

测试DAG:cc_ods.test

 

根据官网内容对code进行修改

# -*- coding: utf8 -*-
import pprint
from datetime import timedelta, datetime
from os import sys, path

import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

reload(sys)
sys.setdefaultencoding(\'utf-8\')
sys.path.append(path.dirname(path.abspath(__file__)))
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))

from utils import basic_util, send_email, sms_service, env_conf, alarm_util
import hallo

pp = pprint.PrettyPrinter(indent=4)

str_today = (datetime.today()).strftime(\'%Y-%m-%d\')

## Define the DAG object
default_args = {
    \'owner\': \'yifan.hu\',
    \'depends_on_past\': True,
    \'start_date\': airflow.utils.dates.days_ago(2),
    \'retries\': 4,
    \'retry_delay\': timedelta(minutes=1),
    \'priority_weight\': 10,
    \'execution_timeout\': timedelta(minutes=360),
    \'queue\': \'test_spark\'
}

dag_id = basic_util.get_dag_id(path.abspath(__file__))


dag = DAG(dag_id, default_args=default_args, schedule_interval=\'0 0 * * *\', concurrency=2)


# 元数据系统生成创建表的sql文件
t_meta_create_table_statements = PythonOperator(
    task_id=\'meta_create_table_statements\',
    python_callable=hallo.test_task1,
    default_args=default_args,
    dag=dag
)

在py文件中的default_args添加了\'queue\'\'test_spark\',给这个任务指定了queue

然后重新调起这个任务

 

在修改work上的队列标志之前,这个任务一直显示queued,因为没有任何一个work的队列标签是test_spark,所以,没有work可以去执行它,

然后在启动worker的脚本中添加了 -q test_spark

 

修改完成后重启这个节点上的worker(这里我修改的是节点test31)

重启worker以后,任务立刻执行完毕,可以看到执行的节点就是test31(多次重复实验均是在test31上进行)

 

 

在官网中又一段话是

worker可以听一个或多个任务队列。启动工作程序时(使用命令airflow worker),可以指定一组用逗号分隔的队列名称(例如airflow worker -q spark)。然后,此worker将仅拾取连接到指定队列的任务。

也就是说我们在启动这个节点上的worker的时候,加一个参数 -q test_spark,但是在airflow.cfg中不做修改,被标志test_spark的任务将会在这个节点上执行,并且在元数据中,queue列也会显示test_spark,而其他没有定义队列的task,将会按照airflow.cfg中default_queue参数配置进行,也就是说并不是这个worker打了test_spark的标签就只能执行带有test_spark队列标签的任务,只是这样的任务一定会在这个worker执行,其他task会按照airflow.cfg中定义的进行。

下图是元数据所示

 

airflow.cfg中default_queue参数配置