python第三方库系列之十三--定时任务apscheduler库

时间:2022-01-25 07:48:50
        APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了。

1. 定时任务例子

        APScheduler是进程内的调度器,可以定时触发具体的函数,并且可以访问应用的所有变量和函数。在web应用中通过APScheduler实现定时任务是很方便的。下面看例子:

from apscheduler.scheduler import Scheduler  

schedudler = Scheduler(daemonic = False)

@schedudler.cron_schedule(second='*', day_of_week='0-4', hour='9-12,13-15')
def quote_send_sh_job():
print 'a simple cron job start at', datetime.datetime.now()

schedudler.start()

        上面通过装饰器定义了cron job,可以通过函数scheduler.add_cron_job添加,用装饰器更方便。Scheduler构造函数中传入daemonic参数,表示执行线程是非守护的,在Schduler的文档中推荐使用非守护线程:(Jobs are always executed in non-daemonic threads.  )

        定时任务的三种方式:

        (1)simple date-based scheduling(定时任务,时间固定,执行一次)

from datetime import date
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# example: 需求:在 2013-1-4 13:14:21 打印 i love you</span>
def my_job(text):
print text

# Store the job in a variable in case we want to cancel it
# 方法的第一个参数是需要执行的方法名,第二个参数是时间,第三个参数是需要执行的方法的参数列表
job = sched.add_date_job(my_job, '2013-01-04 13:14:21', ['i love you'])</span>

        (2)Interval-based scheduling(每隔多长时间执行一次)

# example: 需求:每隔一个小时打印一次hello world</span>
job = sched.add_interval_job(my_job,hour=1,['hellow world'])

        (3)cron-style scheduling(定时循环执行,比如每个月的几号,或者每周几,或者一年中的第几周执行)

# 没有设置时分秒默认为0,example: 需求: 每周一,三,五打印hello world(周日是0,周六是6)
job = sched.add_cron_job(my_job,day-of-week='0,2,4',['hellow world'])

        更多参数挪步官网:https://apscheduler.readthedocs.org/en/v2.1.0/modules/scheduler.html

        在添加job时还有一个比较重要的参数max_instances,指定一个job的并发实例数,默认值是1。默认情况下,如果一个job准备执行,但是该job的前一个实例尚未执行完,则后一个job会失败,可以通过这个参数来改变这种情况。

2. 存储任务执行信息

        APScheduler提供了jobstore用于存储job的执行信息,默认使用的是RAMJobStore,还提供了SQLAlchemyJobStore、ShelveJobStore和MongoDBJobStore。APScheduler允许同时使用多个jobstore,通过别名(alias)区分,在添加job时需要指定具体的jobstore的别名,否则使用的是别名是default的jobstore,即RAMJobStore。下面以MongoDBJobStore举例说明。

import pymongo  
from apscheduler.scheduler import Scheduler
from apscheduler.jobstores.mongodb_store import MongoDBJobStore
import time

sched = Scheduler(daemonic = False)

mongo = pymongo.Connection(host='127.0.0.1', port=27017)
store = MongoDBJobStore(connection=mongo)
sched.add_jobstore(store, 'mongo') # 别名是mongo

@sched.cron_schedule(second='*', day_of_week='0-4', hour='9-12,13-15', jobstore='mongo') # 向别名为mongo的jobstore添加job
def job():
print 'a job'
time.sleep(1)

sched.start()

        注意start必须在添加job动作之后调用,否则会抛错。默认会把job信息保存在apscheduler数据库下的jobs表:

> db.jobs.findOne()  
{
"_id" : ObjectId("502202d1443c1557fa8b8d66"),
"runs" : 20,
"name" : "job",
"misfire_grace_time" : 1,
"coalesce" : true,
"args" : BinData(0,"gAJdcQEu"),
"next_run_time" : ISODate("2012-08-08T14:10:46Z"),
"max_instances" : 1,
"max_runs" : null,
"trigger" : BinData(0,"xxx..."),
"func_ref" : "__main__:job",
"kwargs" : BinData(0,"gAJ9cQEu")
}

        上面就是存储的具体信息。
3.异常处理

        当job抛出异常时,APScheduler会默默的把他吞掉,不提供任何提示,这不是一种好的实践,我们必须知晓程序的任何差错。APScheduler提供注册listener,可以监听一些事件,包括:job抛出异常、job没有来得及执行等。

        看下面的例子,监听异常和miss事件,这里用logging模块打印日志,logger.exception()可以打印出异常堆栈信息。

def err_listener(ev):  
err_logger = logging.getLogger('schedErrJob')
if ev.exception:
err_logger.exception('%s error.', str(ev.job))
else:
err_logger.info('%s miss', str(ev.job))

schedudler.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)

        事件的属性包括:

job – the job instance in question
scheduled_run_time – the time when the job was scheduled to be run
retval – the return value of the successfully executed job
exception – the exception raised by the job
traceback – the traceback object associated with the exception

        最后,需要注意一点当job不以daemon模式运行时,并且APScheduler也不是daemon的,那么在关闭脚本时,Ctrl + C是不奏效的,必须kill才可以。可以通过命令实现关闭脚本:

ps axu | grep {脚本名} | grep -v grep | awk '{print $2;}' | xargs kill