基于Flask-APScheduler实现添加动态定时任务

时间:2024-03-12 19:26:00

阅读目录

一、apSheduler

二、Flask-APScheduler

三、动态定时任务

四、uwsgi部署注意事项

一、apSheduler

第一部分内容限于apSheduler3.0以下版本,以上版本可移步至 FastAPI+apSheduler动态定时任务

1. 引子(Introduction)

Advanced Python Scheduler (APScheduler) 是一个轻量级但功能强大的进程内任务调度器,允许您调度函数(或任何其他python可调用文件)在您选择的时间执行。

2. 特性(Features)

  1. 没有(硬)外部依赖性
  2. api线程安全
  3. 支持CPython、Jython、PyPy
  4. 可配置的调度机制(触发器):
    1. 类似cron调度
    2. 单次运行延迟调度(如UNIX“at”命令)
    3. 基于时间间隔(以指定的时间间隔运行)
  5. 支持多种存储空间
    1. RAM
    2. 基于文件的简单数据库
    3. SQLAlchem
    4. MongoDB
    5. Redis

3. 使用(Usage)

3.1 安装

  • pip install apscheduler

3.2 启动调度程序

from apscheduler.scheduler import Scheduler

sched = Scheduler()
sched.start()

3.3 调度job

3.3.1 简单日期调度job

在指定时间执行一次job。这是相当于UNIX“at”命令的进程内命令

from datetime import date
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# Define the function that is to be executed
def my_job(text):
    print text

# The job will be executed on November 6th, 2009
exec_date = date(2009, 11, 6)

# 添加一个job
job = sched.add_date_job(my_job, exec_date, [\'text\'])

更具体地安排时间

from datetime import datetime

# The job will be executed on November 6th, 2009 at 16:30:05
job = sched.add_date_job(my_job, datetime(2009, 11, 6, 16, 30, 5), [\'text\'])

甚至可以将日期指定为字符串文本

job = sched.add_date_job(my_job, \'2009-11-06 16:30:05\', [\'text\'])

# 支持微秒级别
job = sched.add_date_job(my_job, \'2009-11-06 16:30:05.720400\', [\'text\'])
3.3.2 基于时间间隔的调度job

job的执行在给定延迟后开始,或者在start_date(如果指定)开始,start_date参数可以作为date/datetime对象或字符串文本给出。

from datetime import datetime

from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

def job_function():
    print "Hello World"

# Schedule job_function to be called every two hours
sched.add_interval_job(job_function, hours=2)

# The same as before, but start after a certain time point
sched.add_interval_job(job_function, hours=2, start_date=\'2010-10-10 09:30\')

装饰语法

from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# Schedule job_function to be called every two hours
@sched.interval_schedule(hours=2)
def job_function():
    print "Hello World"

如果需要取消对装饰功能的job,可以这样做

scheduler.unschedule_job(job_function.job)
3.3.3 cron调度job

与crontab表达式不同,您可以省略不需要的字段。大于最低有效明确定义字段的字段默认为,而较小的字段默认为其最小值,除了默认为。例如,如果仅指定day=1,minute=20,则作业将在每年每月的第一天以每小时20分钟的速度执行。下面的代码示例应该进一步说明这种行为。
省略字段默认为*

from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

def job_function():
    print "Hello World"

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_cron_job(job_function, month=\'6-8,11-12\', day=\'3rd fri\', hour=\'0-3\')

# Schedule a backup to run once from Monday to Friday at 5:30 (am)
sched.add_cron_job(job_function, day_of_week=\'mon-fri\', hour=5, minute=30)

装饰语法

@sched.cron_schedule(day=\'last sun\')
def some_decorated_task():
    print "I am printed at 00:00:00 on the last Sunday of every month!"

如果需要取消对装饰功能的job,可以这样做

scheduler.unschedule_job(job_function.job)
3.3.4 使用自定义触发器调度

以上事例基于内置触发器调度job,如果需要使用自定义触发器调度需要使用add_job()方法

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
  
def aps_test(x):
    print(datetime.datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\'), x)    
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(\'定时任务\',), trigger=\'cron\', second=\'*/5\')
scheduler.add_job(func=aps_test, args=(\'一次性任务\',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=(\'循环任务\',), trigger=\'interval\', seconds=3)

scheduler.start()

3.4 关闭调度器

sched.shutdown()

# 默认情况下,调度程序关闭其线程池,并等待直到所有当前正在执行的job完成。为了更快地退出,可以:
sched.shutdown(wait=False)

# 这仍然会关闭线程池,但不会等待任何正在运行的任务完成。此外,如果您给调度程序一个要在其他地方管理的线程池,您可能希望完全跳过线程池关闭:
sched.shutdown(shutdown_threadpool=False)

# 自动关闭调度程序的一个巧妙方法是为此使用atexit挂钩:
import atexit
sched = Scheduler(daemon=True)
atexit.register(lambda: sched.shutdown(wait=False))
# Proceed with starting the actual application

3.5 Job stores

如果没有指定stores存储位置,则将转到默认job存储 -> ramjobstore不提供持久化保存
其它存储stores:

ShelveJobStore
SQLAlchemyJobStore
MongoDBJobStore
RedisJobStore

通过配置选项或add_jobstore()方法添加作业存储。因此,以下是相等的:

config = {\'apscheduler.jobstores.file.class\': \'apscheduler.jobstores.shelve_store:ShelveJobStore\',
      \'apscheduler.jobstores.file.path\': \'/tmp/dbfile\'}
sched = Scheduler(config)

3.6 获取调度器列表

sched.print_jobs()

二、Flask-APScheduler

1. 引子(Introduction)

  • Flask-APScheduler 是Flask框架的一个扩展库,增加了Flask对apScheduler的支持

2. 特性(Features)

  1. 根据Flask配置加载调度器配置
  2. 根据Flask配置加载调度器job
  3. 允许指定调度程序将运行的主机名
  4. 提供REST API来管理调度job
  5. 为REST API提供认证

3. 安装(Installation)

pip install Flask-APScheduler

4. 使用(Usage)

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_apscheduler import APScheduler

class Config(object):
    # 配置执行job
    JOBS = [
        {
            \'id\': \'job1\',
            \'func\': \'advanced:job1\',
            \'args\': (1, 2),
            \'trigger\': \'interval\',
            \'seconds\': 10
        }
    ]
    # 存储位置
    SCHEDULER_JOBSTORES = {
        \'default\': SQLAlchemyJobStore(url=\'sqlite://\')
    }
    # 线程池配置
    SCHEDULER_EXECUTORS = {
        \'default\': {\'type\': \'threadpool\', \'max_workers\': 20}
    }

    SCHEDULER_JOB_DEFAULTS = {
        \'coalesce\': False,
        \'max_instances\': 3
    }
    # 调度器开关
    SCHEDULER_API_ENABLED = True


def job1(a, b):
    print(str(a) + \' \' + str(b))
    
if __name__ == \'__main__\':
    app = Flask(__name__)
    app.config.from_object(Config())
    
    scheduler = APScheduler()
    # 注册app
    scheduler.init_app(app)
    scheduler.start()

    app.run()    

三、动态定时任务

  • Flask + flask_apscheduler实现一个类似Jenkins的定时任务的功能,前端设置crontab,后端可以创建,修改,暂停,移除,恢复一个执行任务
    文件目录
    |--app
    |----config.py      配置文件
    |----run_tasks.py   开启任务
    |----tasks.py       任务job
    |----apSheduler.py  提供接口函数
    |----extensions.py  flask扩展
    |----__init__.py    初始化文件
    |----views.py       业务代码
    |--manage.py        项目启动文件
    

1. config.py配置flask_apscheduler

class Config(object):
    # 开关
    SCHEDULER_API_ENABLED = True
    # 持久化配置
    SCHEDULER_JOBSTORES = {
            \'default\': SQLAlchemyJobStore(url=\'sqlite:///flask_context.db\')
        }
    SCHEDULER_EXECUTORS = {
        \'default\': {\'type\': \'threadpool\', \'max_workers\': 20}
    }

2. init.py创建app

from app.config import Config
from app.extensions import scheduler

# 创建app
def create_app(config=None, app_name=None, blueprints=None):
    app = Flask(app_name, static_folder=\'thanos/static\',
        template_folder=\'thanos/resource/report\')
    # 导入flask配置 -> 这里根据自己的项目导入配置就好哇
    # config = Config.get_config_from_host(app.name)
    app.config.from_object(config)
    # 初始化调度器配置    
    configure_scheduler(app)
        
def configure_scheduler(app):
    """Configure Scheduler"""
    scheduler.init_app(app)
    scheduler.start()
    # 加载任务,选择了第一次请求flask后端时加载,可以选择别的方式...
    @app.before_first_request   
    def load_tasks():
        # 开启任务
        from app import run_tasks

3. extensions.py实例化scheduler

from flask_apscheduler import APScheduler
scheduler = APScheduler()    

4. apSheduler.py提供调度器接口

"""此文件可以根据具体业务复杂化选择写或者直接调用原apscheduler接口"""
from flask import current_app
# from .extensions import scheduler  直接导入单例对象操作也行

class APScheduler(object):
"""调度器控制方法""" 
    def add_job(self, jobid, func, args, **kwargs):
        """
        添加任务
        :param args:  元祖 -> (1,2)
        :param jobstore:  存储位置
        :param trigger:
                        data ->  run_date   datetime表达式
                        cron ->  second/minute/day_of_week
                        interval ->  seconds 延迟时间
                        next_run_time ->  datetime.datetime.now() + datetime.timedelta(seconds=12))
        :return:
        """
        job_def = dict(kwargs)
        job_def[\'id\'] = jobid
        job_def[\'func\'] = func
        job_def[\'args\'] = args
        job_def = self.fix_job_def(job_def)
        self.remove_job(jobid)  # 删除原job
        current_app.apscheduler.scheduler.add_job(**job_def)

    def remove_job(self, jobid, jobstore=None):
        """删除任务"""
        current_app.apscheduler.remove_job(jobid, jobstore=jobstore)

    def resume_job(self, jobid, jobstore=None):
        """恢复任务"""
        current_app.apscheduler.resume_job(jobid, jobstore=jobstore)

    def pause_job(self, jobid, jobstore=None):
        """恢复任务"""
        current_app.apscheduler.pause_job(jobid, jobstore=jobstore)

    def fix_job_def(self, job_def):
        """维修job工程"""
        if job_def.get(\'trigger\') == \'date\':
            job_def[\'run_date\'] = job_def.get(\'run_date\') or None
        elif job_def.get(\'trigger\') == \'cron\':
            job_def[\'hour\'] = job_def.get(\'hour\') or "*"
            job_def[\'minute\'] = job_def.get(\'minute\') or "*"
            job_def[\'week\'] = job_def.get(\'week\') or "*"
            job_def[\'day\'] = job_def.get(\'day\') or "*"
            job_def[\'month\'] = job_def.get(\'month\') or "*"
        elif job_def.get(\'trigger\') == \'interval\':
            job_def[\'seconds\'] = job_def.get(\'seconds\') or "*"
        else:
            if job_def.get("andTri"):
                job_def[\'trigger\'] = AndTrigger([job_def.pop("andTri", None), ])
            # job_def[\'next_run_time\'] = job_def.get(\'next_run_time\') or None
        return job_def 

5. views.py 实现调度器接口

from app.apSheduler import APScheduler

# croniter库解析Linux cron格式的计划

# 以添加为例子 暂停 删除 恢复可以根据业务场景自己写接口
def add_crontab_task(self, params):
    """添加一个crontab任务"""
    try:
        self.crontab = params.get("crontab")
        self.id = params.get("id")
        self.task_id = params.get("task_id")
    except Exception as e:
        return False, str(e)
    # 记录数据库
    res = addSql()
    # 更新任务信息
    APScheduler().add_job(jobid=self.id, func=task_func,
                          args=(self.task_id,), andTri=CronTrigger.from_crontab(self.crontab))
    if res is False:
        return False, "数据库操作异常"
    return True, croniter(self.crontab, datetime.now()).get_next(datetime)

def get_next_execute_time(self, params):
    """获取下一次执行时间"""
    try:
        self.crontab = params.get("crontab")
    except Exception as e:
        return False, str(e)
    return True, str(croniter(self.crontab, datetime.now()).get_next(datetime))

6. tasks.py 任务job

def task_func(task_id):
    """业务逻辑"""
    # 发邮件、写诗、画画 -> 爱干啥干啥

7. run_tasks.py 开启任务调度大门

from .task import task_func
from apscheduler.triggers.cron import CronTrigger  # 可以很友好的支持添加一个crontab表达式

def run_task():
    # 查询数据库的crontab信息 -> 定时任务信息
    res = fetall("select * from crontab_table")
    # 遍历添加任务
    shche = APScheduler()
    for rs in res:
        shche.add_job(jobid=rs.get(id), func=task_func,
                  args=(rs.get(task_id)), andTri=CronTrigger.from_crontab(rs.get(crontab)))

# 最重要的       
run_task()   # 这样当__init__.py创建app时加载这个文件,就会执行添加历史任务啦!

8. manage.py 启动项目

from app import create_app
app = create_app()
app.run()

四、uwsgi部署注意事项

1. 常见问题及解决方案

1.1 线上部署uWSGI+APScheduler执行定时任务卡死

1.1.1问题分析:

APScheduler运行环境需要为多线程,uwsgi默认是one thread ,one process,需要在配置文件里面加上一条 enable-thread = true,也就是允许程序内部启动多线程。

1.1.2解决方案:
# uwsgi.ini文件追加以下配置
enable-threads = true
preload=True  #用--preload启动uWSGI,确保scheduler只在loader的时候创建一次
lazy-apps=true

1.2 定时任务多次执行的问题

1.2.1问题分析:

1.本地原因,错过了上次执行时间,下次会多次执行
2.线上部署的,如uWSGI部署,配置了processes>1导致加载了多此apscheduler(apscheduler当前没有任何进程间同步和信令方案)

1.2.3解决方案:

1. 本地多次执行可以在Flask启动方法中加use_reloader=False

app.run(host="0.0.0.0", port=8888, use_reloader=False)

2.线上linux可以借鉴下面的方法,网上借鉴的
在__init__.py文件中修改中configure_scheduler(),用全局锁确保scheduler只运行一次, 代码如下:

import atexit
import fcntl  # 只能用于linux
from .extensions import scheduler

def configure_scheduler(app):
    """Configure Scheduler"""
    f = open("scheduler.lock", "wb")
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        scheduler.init_app(app)
        scheduler.start()
        # 加载任务
        @app.before_first_request
        def load_tasks():
            from thanos import run_tasks
    except:
        pass
    def unlock():
        fcntl.flock(f, fcntl.LOCK_UN)
        f.close()
    atexit.register(unlock)

init函数为flask项目初始化所调用,这里为scheduler模块的初始化部分。首先打开(或创建)一个scheduler.lock文件,并加上非阻塞互斥锁。成功后创建scheduler并启动。如果加文件锁失败,说明scheduler已经创建,就略过创建scheduler的部分。
最后注册一个退出事件,如果这个flask项目退出,则解锁并关闭scheduler.lock文件的锁。

3.官网推荐rpyc/grpc解决
可以查看 FastAPI+apSheduler动态定时任务