Python任务调度模块 – APScheduler

时间:2022-05-29 07:46:10

APScheduler是一个Python定时任务框架,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务、并以daemon方式运行应用。

在APScheduler中有四个组件:
触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。简单说就是应该说明一个任务应该在什么时候执行。
作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
调度器(scheduler)任务控制器:通过配置executor、jobstore、trigger,使用线程池(ThreadPoolExecutor默认值20)或进程池(ProcessPoolExecutor 默认值5)并且默认最多3个(max_instances)任务实例同时运行,实现对job的增删改查等调度控制

你需要选择合适的调度器,这取决于你的应用环境和你使用APScheduler的目的。通常最常用的两个:
- BlockingScheduler:当调度器是你应用中唯一要运行的东西时使用。
- BackgroundScheduler:当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。

作业存储

支持4中作业存储,分别是:MemoryJobStore(存储在内存中)、sqlalchemy(关系型数据库)、mongodb(文档数据库)、redis(内存型键值对数据库)

三种作业触发方式

  • date:固定日期触发器:任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建
  • interval:时间间隔触发器,每个一定时间间隔执行一次。
  • cron:cron风格的任务触发。

简单使用

import datetime
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def my_job1():
print(datetime.datetime.now())
print('-'*10+'my_job1'+'-'*10)

def my_job2(text):
print(datetime.datetime.now())
print('-' * 10 + text + '-' * 10)

sched = BlockingScheduler() # 创建调度器
sched.add_job(my_job1, 'interval', seconds=5,id='my_job1') # 每隔5秒执行一次
sched.add_job(my_job2, 'date', run_date=datetime.datetime(2016,9,17,20,12,6), args=['text'],id='my_job2') # 执行时间点执行
sched.start()

使用关系型数据库存储作业

# 会在数据库中生成apscheduler_jobs表,如果每次启动都执行add_job函数,则会在数据库中存放多个相同的作业,比如my_job1会存在多条记录,就会被反复调用,相当于并行执行了。
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
# an SQLAlchemyJobStore named “default” (using SQLite)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# a ThreadPoolExecutor named “default”, with a worker count of 20
# a ProcessPoolExecutor named “processpool”, with a worker count of 5
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# 使用sqlite存储作业
sqlitedb = r"E:\stock_py\stock_py\tools\database\sqlitedb.db"
url = r'sqlite:///%s' %sqlitedb

# 使用mysql存储作业
# url = 'mysql://hr:123456@localhost/stock'

# 其他的存储方式可以参考:
# http://apscheduler.readthedocs.io/en/latest/modules/jobstores/sqlalchemy.html#apscheduler.jobstores.sqlalchemy.SQLAlchemyJobStore
# http://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=create_engine#database-urls


def my_job1():
print(datetime.datetime.now())
print('-'*10+'my_job1'+'-'*10)

def my_job2(text):
print(datetime.datetime.now())
print('-' * 10 + text + '-' * 10)

jobstores = {
'default': SQLAlchemyJobStore(url=url)
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}

sched = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
sched.add_job(my_job1,trigger='interval', seconds=3,id='my_job1')
sched.add_job(my_job2,trigger='interval', seconds=3,args=['my_job2'],id='my_job2')
sched.start()

参考:

http://apscheduler.readthedocs.io/en/latest/userguide.html
http://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=create_engine#database-urls
http://debugo.com/apscheduler/
http://blog.csdn.net/caiguoxiong0101/article/details/50364236