【原创】运维基础之Docker(5)docker部署airflow

时间:2022-03-30 16:51:51

部署方式:docker+airflow+mysql+LocalExecutor

使用airflow的docker镜像

https://hub.docker.com/r/puckel/docker-airflow

使用默认的sqlite+SequentialExecutor启动:

$ docker run -d -p 8080:8080 puckel/docker-airflow webserver

将容器中的airflow.cfg拷贝出来修改

$ docker cp $container_id:/usr/local/airflow/airflow.cfg .

尝试使用自定义airflow.cfg

-v /usr/local/airflow/airflow.cfg:/usr/local/airflow/airflow.cfg

其中修改sql_alchemy_conn为mysql,修改executor = LocalExecutor

发现使用的还是SequentialExecutor

[2019-02-28 19:37:16,170] {{__init__.py:51}} INFO - Using executor SequentialExecutor

查看Dockerfile:docker-airflow/Dockerfile

ENTRYPOINT ["/entrypoint.sh"]
CMD ["webserver"] # set default arg for entrypoint

发现最后启动的脚本是entrypoint.sh

查看entrypoint.sh:docker-airflow/script/entrypoint.sh

: "${AIRFLOW__CORE__EXECUTOR:=${EXECUTOR:-Sequential}Executor}"

...

if [ "$AIRFLOW__CORE__EXECUTOR" != "SequentialExecutor" ]; then

AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"

AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"

wait_for_port "Postgres" "$POSTGRES_HOST" "$POSTGRES_PORT"

fi

...

case "$1" in

webserver)

airflow initdb

if [ "$AIRFLOW__CORE__EXECUTOR" = "LocalExecutor" ]; then

# With the "Local" executor it should all run in one container.

airflow scheduler &

fi

exec airflow webserver

;;

1)取环境变量EXECUTOR(取值为Sequential、Local等)来构造环境变量AIRFLOW__CORE__EXECUTOR;
2)如果AIRFLOW__CORE__EXECUTOR不是SequentialExecutor,就等待postgres(这里强制依赖postgres);
3)如果启动参数为webserver,同时AIRFLOW__CORE__EXECUTOR=LocalExecutor,自动启动scheduler;

Due to Airflow’s automatic environment variable expansion, you can also set the env var AIRFLOW__CORE__* to temporarily overwrite airflow.cfg.

由于环境变量优先级高于airflow.cfg,所以即使修改了airflow.cfg中executor=LocalExecutor,实际使用的还是SequentialExecutor;将容器中的entrypoint.sh拷贝出来修改

$ docker cp $container_id:/entrypoint.sh .

注释掉以下行

#if [ "$AIRFLOW__CORE__EXECUTOR" != "SequentialExecutor" ]; then

#  AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"

#  AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"

#  wait_for_port "Postgres" "$POSTGRES_HOST" "$POSTGRES_PORT"

#fi

启动命令

$ docker run -d -p 8080:8080 -e EXECUTOR=Local -v /usr/local/airflow/airflow.cfg:/usr/local/airflow/airflow.cfg -v /usr/local/airflow/entrypoint.sh:/entrypoint.sh -v /usr/local/airflow/dags:/usr/local/airflow/dags -v /usr/local/airflow/logs:/usr/local/airflow/logs puckel/docker-airflow webserver

虽然是单点,但是配合mesos+hdfs nfs可以做成高可用用于生产环境;

参考:
https://github.com/puckel/docker-airflow