Airflow
airflow是一个任务调度组件,主要是基于DAG(有向无环图)来定义整个工作流。他主要解决了crontab调度无法完成的任务依赖,网页服务,任务暂停等功能。并且airflow能够很好的支持python,spark,hive,k8s等
-
airflow架构
airflow包含以下组件:
元数据库(存储DAG)
执行器worker(负责执行task)
调度器sheduler(负责任务触发)
webServer(网页服务)
-
airflow任务调度原理
airflow多进程调度原理如下:
首先airflow会创建一个DagFileProcessor进程池遍历dags文件夹下所有的dag文件,每个进程处理一个dag文件,产生的结果是DAGRUNS(图的状态)与taskInstance(任务实例)放入数据库中,此时taskInstance标记为Queued。
同时,shedulerjob类对象会周期的检查数据库,会将标记为queued的taskInstance放入Executor队列中,并将数据库中该任务状态标记为Scheduled
每个可用的Executor会从Executor队列中取出一个TaskInstance执行,然后将数据库中的该任务状态标记为running
当一个taskInstance执行完毕,executor会将数据库中该任务的状态标记为(完成、失败、跳过等),按照图的任务依赖关系依次处理各任务
当一个进程处理完毕一个dag,它会为下一个dag重复上述过程。
当所有的dag文件处理完毕,airflow将进入下一个循环。若一个dag处理时间过长,在下一个循环的周期内进程池将忽略此dag,避免阻塞。
-
airflow的celery机制:
celeryExecutor主要是用于扩大worker规模的机制。为了使用此功能需要配置相关配件(RabbitMQ,REDIS,..),并且修改airflow的相关配置为CeleryExecutor.
1,webserver从workers中取任务执行日志
2,webserver从Dag文件中获取Dag结构
3,webserver从DB中获取任务状态
4,workers从DAGFiles中获取DAG结构和执行任务
5,workers从DB中获取和存储连接配置,变量和XCOM等信息
6,workers将任务状态保存在celery Result Backend中
7,workers将执行命令存储在celery Queue Brokers中
8,Scheduler将图状态和相关任务存储在DB中
9, Scheduler从DAGFiles中获取DAG结构和执行任务
10,Scheduler从Celery Result backend中获取已经完成任务的状态信息
11, Scheduler在Celery broker中放置将要执行的命令。
-
airflow的Dag创建
airflow的dag是通过python脚本创建,用来定义各任务之间的依赖。以创建下图的dag为例
首先需要引入airflow包中的DAG和BashOperator
然后创建参数字典集
args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2), }
‘owner'如果没有指明默认是'airflow'
然后创建dag对象
dag = DAG( dag_id='example_bash_operator', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60), )
dag_id是标识dag的唯一属性,default_args设置dag默认参数,'schedule_interval‘表示该dag的执行间隔,dagrun_timeout表示dag的超时设置,如果dag超过这个时间没有执行完毕,dag中未执行的任务将会被表示失败。
创建任务
run_this_last = DummyOperator( task_id='run_this_last', dag=dag, )
任务根据操作不同分为DummyOperator(),PythonOperator(),BashOperator()等等,用户还可以自定义operator().通过task_id,dag等参数进行实例化。下面是陆续定义了几个任务
# [START howto_operator_bash] run_this = BashOperator( task_id='run_after_loop', bash_command='echo 1', dag=dag, ) # [END howto_operator_bash]
run_this >> run_this_last
for i in range(3): task = BashOperator( task_id='runme_' + str(i), bash_command='echo "{{ task_instance_key_str }}" && sleep 1', dag=dag, ) task >> run_this # [START howto_operator_bash_template]
also_run_this = BashOperator( task_id='also_run_this', bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag, ) # [END howto_operator_bash_template]
also_run_this >> run_this_last
各个任务之间通过<<,>>来定义各个任务之间的依赖,例如also_run_this>>run_this_last表示also_run_this执行完毕后将执行run_this_last,其等价于run_this_last<<also_run_this.
同时可以通过set_upstream和set_downstream来设定依赖,例如also_run_this.set_downstream(run_this_last),同样等价于run_this_last.set_upstream(also_run_this).
将上述创建的脚本放在airflow默认的dags文件夹下,airflow将会扫描到该dag自动执行,同时也可以通过命令行界面(cli)或者webUI将会手动触发该dag的执行。
-
airflow的webUI
airflow具有强大的web页面,支持任务的dag图查看,tree图查看,日志查看,源码查看,执行状态查看,任务暂停等操作。具体如下图所示:
K8S
k8s是一个容器调度的微服务架构,通过调度pod来实现任务的最优化执行。其分布架构是一个集群控制点与多个工作节点,主要解决的是pod与node之间资源匹配问题。其中Master节点负责整个集群管理工作为集群提供管理接口,
并监控和编排集群中的各个工作节点。各个工作节点是以pod形式运行容器,各个节点需要配置号运行容器所依赖的所有服务及资源。
k8s的调度
affinity/anti_affinity原则
在一些需要频繁交互或者公用资源的业务下,一些pod需要部署在同一节点或者相同一个域之内,但是另外一些业务出于安全和容灾的考虑需要分布部署在不同的几点或者区域,出于上述业务考虑引出了affinity和anti_affinity的部署规则。
node_affinity,在生成pod时,可以指明一些pod需要部署在具有某个标签的node上,例如,有些pod需要快速的IO操作,需要部署在ssd的硬盘上,这时如果node标签有disk_type:ssd,那么该pod就可以部署在该节点上,反之则不能部署成功。之后改进版本引入了硬需求与软需求概念,硬需求同上,软需求表示如果条件满足最好,如果不能满足也能创建成功。
pod的affinity/anti_affinity表示部署之前查看该节点或者该域是否存在某个标签的pod。这里的范围是通过pod的定义的拓扑域来确定的。
污点与容忍度
有些节点定义了污点,表示只有具有该类标签的pod才能被部署,否则无法部署
pod可以定义容忍度来与该node进行匹配。