1. 定时任务例子
APScheduler是进程内的调度器,可以定时触发具体的函数,并且可以访问应用的所有变量和函数。在web应用中通过APScheduler实现定时任务是很方便的。下面看例子:
from apscheduler.scheduler import Scheduler
schedudler = Scheduler(daemonic = False)
@schedudler.cron_schedule(second='*', day_of_week='0-4', hour='9-12,13-15')
def quote_send_sh_job():
print 'a simple cron job start at', datetime.datetime.now()
schedudler.start()
上面通过装饰器定义了cron job,可以通过函数scheduler.add_cron_job添加,用装饰器更方便。Scheduler构造函数中传入daemonic参数,表示执行线程是非守护的,在Schduler的文档中推荐使用非守护线程:(Jobs are always executed in non-daemonic threads. )
定时任务的三种方式:
(1)simple date-based scheduling(定时任务,时间固定,执行一次)
from datetime import date
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
# example: 需求:在 2013-1-4 13:14:21 打印 i love you</span>
def my_job(text):
print text
# Store the job in a variable in case we want to cancel it
# 方法的第一个参数是需要执行的方法名,第二个参数是时间,第三个参数是需要执行的方法的参数列表
job = sched.add_date_job(my_job, '2013-01-04 13:14:21', ['i love you'])</span>
(2)Interval-based scheduling(每隔多长时间执行一次)
# example: 需求:每隔一个小时打印一次hello world</span>
job = sched.add_interval_job(my_job,hour=1,['hellow world'])
(3)cron-style scheduling(定时循环执行,比如每个月的几号,或者每周几,或者一年中的第几周执行)
# 没有设置时分秒默认为0,example: 需求: 每周一,三,五打印hello world(周日是0,周六是6)
job = sched.add_cron_job(my_job,day-of-week='0,2,4',['hellow world'])
更多参数挪步官网:https://apscheduler.readthedocs.org/en/v2.1.0/modules/scheduler.html
在添加job时还有一个比较重要的参数max_instances,指定一个job的并发实例数,默认值是1。默认情况下,如果一个job准备执行,但是该job的前一个实例尚未执行完,则后一个job会失败,可以通过这个参数来改变这种情况。
2. 存储任务执行信息
APScheduler提供了jobstore用于存储job的执行信息,默认使用的是RAMJobStore,还提供了SQLAlchemyJobStore、ShelveJobStore和MongoDBJobStore。APScheduler允许同时使用多个jobstore,通过别名(alias)区分,在添加job时需要指定具体的jobstore的别名,否则使用的是别名是default的jobstore,即RAMJobStore。下面以MongoDBJobStore举例说明。
import pymongo
from apscheduler.scheduler import Scheduler
from apscheduler.jobstores.mongodb_store import MongoDBJobStore
import time
sched = Scheduler(daemonic = False)
mongo = pymongo.Connection(host='127.0.0.1', port=27017)
store = MongoDBJobStore(connection=mongo)
sched.add_jobstore(store, 'mongo') # 别名是mongo
@sched.cron_schedule(second='*', day_of_week='0-4', hour='9-12,13-15', jobstore='mongo') # 向别名为mongo的jobstore添加job
def job():
print 'a job'
time.sleep(1)
sched.start()
注意start必须在添加job动作之后调用,否则会抛错。默认会把job信息保存在apscheduler数据库下的jobs表:
> db.jobs.findOne()上面就是存储的具体信息。
{
"_id" : ObjectId("502202d1443c1557fa8b8d66"),
"runs" : 20,
"name" : "job",
"misfire_grace_time" : 1,
"coalesce" : true,
"args" : BinData(0,"gAJdcQEu"),
"next_run_time" : ISODate("2012-08-08T14:10:46Z"),
"max_instances" : 1,
"max_runs" : null,
"trigger" : BinData(0,"xxx..."),
"func_ref" : "__main__:job",
"kwargs" : BinData(0,"gAJ9cQEu")
}
3.异常处理
当job抛出异常时,APScheduler会默默的把他吞掉,不提供任何提示,这不是一种好的实践,我们必须知晓程序的任何差错。APScheduler提供注册listener,可以监听一些事件,包括:job抛出异常、job没有来得及执行等。
看下面的例子,监听异常和miss事件,这里用logging模块打印日志,logger.exception()可以打印出异常堆栈信息。
def err_listener(ev):
err_logger = logging.getLogger('schedErrJob')
if ev.exception:
err_logger.exception('%s error.', str(ev.job))
else:
err_logger.info('%s miss', str(ev.job))
schedudler.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)
事件的属性包括:
job – the job instance in question
scheduled_run_time – the time when the job was scheduled to be run
retval – the return value of the successfully executed job
exception – the exception raised by the job
traceback – the traceback object associated with the exception
最后,需要注意一点当job不以daemon模式运行时,并且APScheduler也不是daemon的,那么在关闭脚本时,Ctrl + C是不奏效的,必须kill才可以。可以通过命令实现关闭脚本:
ps axu | grep {脚本名} | grep -v grep | awk '{print $2;}' | xargs kill