airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。
airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,
airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警
系统
测试运行环境使用docker
基本安装
- docker安装
使用别人已经构建好的 puckel/docker-airflow
- 或者使用pip 安装
pip install apache-airflow
简单测试&&运行
- docker-compose
local 运行:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.0-2
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
Celery 运行:
version: '2.1'
services:
redis:
image: 'redis:3.2.7'
# command: redis-server --requirepass redispass
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
# Uncomment these lines to persist data on the local filesystem.
# - PGDATA=/var/lib/postgresql/data/pgdata
# volumes:
# - ./pgdata:/var/lib/postgresql/data/pgdata
webserver:
image: puckel/docker-airflow:1.10.0-2
restart: always
depends_on:
- postgres
- redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
flower:
image: puckel/docker-airflow:1.10.0-2
restart: always
depends_on:
- redis
environment:
- EXECUTOR=Celery
# - REDIS_PASSWORD=redispass
ports:
- "5555:5555"
command: flower
scheduler:
image: puckel/docker-airflow:1.10.0-2
restart: always
depends_on:
- webserver
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
command: scheduler
worker:
image: puckel/docker-airflow:1.10.0-2
restart: always
depends_on:
- scheduler
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
environment:
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
command: worker
- 简单flow
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id="templated",
bash_command=templated_command,
params={"my_param": "Parameter I passed in"},
dag=dag,
)
t2.set_upstream(t1)
t3.set_upstream(t1)
说明
任务的运行是从2015 6.1 开始,运行次数有点多可以进行修改
运行
- 效果
参考资料
https://www.jianshu.com/p/76794553effc
https://hub.docker.com/r/puckel/docker-airflow/
https://github.com/rongfengliang/airflow-docker-compose-demo