Python实现定时任务的三种方案——schedule、APScheduler、Celery

时间:2024-06-02 15:33:09

schedule

schedule是一个轻量级的Python库,用于定期执行任务,即定时任务调度。它提供了一种简单直观的方式来自定义任务执行的时间规则,而无需复杂的线程或进程管理知识。schedule适用于那些需要在后台定期执行某些功能的Python应用程序,比如数据抓取、定时发送邮件、系统维护脚本等场景。

pip install schedule

简单示例

import schedule
import time
def job():
    print("I'm working...")
#每隔1秒执行一次job函数
schedule.every(1).seconds.do(job) 
#每隔10分钟执行一次job函数
schedule.every(10).minutes.do(job)
#每小时的整点执行job函数
schedule.every().hour.do(job)
#每天的14:30分执行job函数
schedule.every().day.at("14:30").do(job)
#随机地在每5到10分钟之间选择一个时间点执行job函数
schedule.every(5).to(10).minutes.do(job)
#每周一执行job函数
schedule.every().monday.do(job)
#每周三的13:15分执行job函数
schedule.every().wednesday.at("13:15").do(job)
#每个小时的第17分钟执行job函数
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

装饰器:通过 @repeat() 装饰静态方法

import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)

传递参数

import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()



@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()

取消任务

import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

运行一次任务

import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

根据标签检索任务

# 检索所有任务:schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)

根据标签取消任务

# 取消所有任务:schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()

运行任务到某时间

import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()

马上运行所有任务

import schedule
def job():
    print('working...')
def job1():
    print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒

并发运行:使用 Python 内置队列实现:

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

APScheduler

APScheduler 是一个功能强大的 Python 库,用于在后台调度作业,支持多种类型的触发器(如定时、间隔、日期和cron表达式),以及持久化作业存储和分布式执行。相较于schedule库,APScheduler 提供了更多的灵活性和企业级特性,适用于更复杂的调度需求。

APScheduler四个组成部分

1.调度器(Scheduler)

作用:调度器是APScheduler的核心,负责管理和驱动整个任务调度流程。它根据配置的时间规则(触发器)来决定何时执行哪些任务,并管理作业的添加、修改、删除以及执行。

2. 触发器(Triggers)

作用:触发器定义了任务执行的时间规则,即决定任务何时触发执行。APScheduler支持几种类型的触发器,包括:

  • IntervalTrigger:按照固定的时间间隔执行任务,如每隔5分钟执行一次。
  • DateTrigger:在指定的日期和时间点执行一次任务。
  • CronTrigger:类似于Unix Cron表达式,提供非常灵活的时间规则,可以精确到秒、分钟、小时、日、月、周几等。

3. 作业储存器(Job Stores)

作用:作业储存器用于持久化作业的状态信息,确保任务调度的可靠性。即使程序重启,也能恢复作业的执行状态。APScheduler支持多种作业储存方式,包括内存、SQL数据库(如SQLite、MySQL)、Redis等。

4. 执行器(Executors)

作用:执行器负责实际执行作业,它决定了任务在哪个线程或进程中运行。APScheduler提供了多种执行器,例如:

  • ThreadPoolExecutor:使用线程池来执行任务。
  • ProcessPoolExecutor:使用进程池来执行任务,适用于CPU密集型任务。
  • GeventExecutor 或 TornadoExecutor:为异步IO框架(如Gevent或Tornado)设计的执行器,适用于I/O密集型任务。

简单使用 

pip install apscheduler



from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 输出时间
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
sched = BlockingScheduler()  #创建调度器
sched.add_job(job, 'interval', seconds=1, id='my_job_id')
sched.start()

add_job部分源码

参数说明:

  • id:指定作业的唯一ID
  • name:指定作业的名字
  • trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
  • executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数
  • max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行
  • next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间
  • misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
  • coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
  • func:Job执行的函数
  • args:Job执行函数需要的位置参数
  • kwargs:Job执行函数需要的关键字参数

调度器、执行器、触发器

Scheduler 调度器

调度器是管理定时任务的

当使用BlockingScheduler时作为独立进程使用,会阻塞主线程
当使用BackgroundScheduler时会在后台运行,不会发生阻塞

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.start()  # 此处程序不会发生阻塞

scheduler = BlockingScheduler()
scheduler.start()  # 此处程序会发生阻塞

executors 执行器

执行器控制执行方式

在定时任务该执行时,ThreadPoolExecutor以线程方式执行任务,ProcessPoolExecutor以进程方式执行任务

# 方式1: 线程
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
    'default': ThreadPoolExecutor(20)   # 最多20个线程同时执行
}
scheduler = BackgroundScheduler(executors=executors)


# 方式2: 进程
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
    'default': ProcessPoolExecutor(3)  # 最多3个进程同时运行
}
scheduler = BackgroundScheduler(executors=executors)

Trigger 触发器

触发器控制的是什么时候会执行任务

目前APScheduler支持触发器:

  • 指定时间的DateTrigger
  • 指定间隔时间的IntervalTrigger
  • 像Linux的crontab一样的CronTrigger

触发器参数:date(作业只执行一次)

from datetime import date

# 在2024年6月11日00:00:00执行
sched.add_job(my_job, 'date', run_date=date(2024, 06, 11))

# 在2024年5月31日16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2024, 05, 31, 16, 30, 5))
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05')

# 立即执行
sched.add_job(my_job, 'date')  
sched.start()

触发器参数:interval

interval间隔调度

  • weeks (int) – 间隔几周
  • days (int) – 间隔几天
  • hours (int) – 间隔几小时
  • minutes (int) – 间隔几分钟
  • seconds (int) – 间隔多少秒
  • start_date (datetime|str) – 开始日期
  • end_date (datetime|str) – 结束日期
  • timezone (datetime.tzinfo|str) – 时区
from datetime import datetime

# 每两小时执行一次
sched.add_job(job_function, 'interval', hours=2)

# 在2024年5月31日09:30:00 到2024年6月15日的时间内,每两小时执行一次
sched.add_job(job_function, 'interval', hours=2, start_date='2010-05-31 09:30:00', end_date='2024-06-15 11:00:00')

触发器参数:cron

cron调度

  • (int|str) 表示参数既可以是int类型,也可以是str类型
  • (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
  • year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
  • month (int|str) – month (1-12) -(表示取值范围为1-12月)
  • day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
  • week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
  • hour (int|str) – hour (0-23) – (表示取值范围为0-23时)
  • minute (int|str) – minute (0-59) – (表示取值范围为0-59分)
  • second (int|str) – second (0-59) – (表示取值范围为0-59秒)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示开始时间)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示结束时间)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)

CronTrigger可用的表达式:

# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

 任务管理

添加、移除、暂停、恢复、停止

# 方式1: 通过对象
job = scheduler.add_job(myfunc, 'interval', minutes=2)  # 添加任务
job.remove()  # 移除任务
job.pause() # 暂停任务
job.resume()  # 恢复任务

# 方式2: 通过任务id
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  # 添加任务    
scheduler.remove_job('my_job_id')  # 移除任务
scheduler.pause_job('my_job_id')  # 暂停任务
scheduler.resume_job('my_job_id')  # 恢复任务

scheduler.shutdown()  # 停止任务

#修改任务
# 方式1: 通过对象
job.modify(max_instances=6, name='Alternate name')

# 方式2: 通过任务id
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

Celery

Celery 是一个基于Python的分布式任务队列和作业调度系统,专为处理大量实时的异步任务和定时任务而设计。它能够有效地将耗时的操作异步处理,从而提高应用程序的响应速度和吞吐量。Celery 架构包含三个核心组件,分别是消息中间件(Broker)、任务执行单元(Worker)和任务结果存储(Result Backend)。

Celery的三大组成

1. 消息中间件(Broker)

消息中间件是Celery的神经系统,它负责接收任务生产者产生的消息并将这些消息分发给任务执行单元(Worker)。Celery 不直接提供消息服务,而是需要与第三方消息中间件集成,常用的包括:

  • RabbitMQ:一个高可用的消息队列服务,支持AMQP协议,广泛应用于企业级系统。
  • Redis:一个高性能的键值存储系统,也可以作为简单的消息队列使用。
  • Amazon SQSGoogle Pub/Sub等云服务商提供的消息队列服务。

2. 任务执行单元(Worker)

Worker是任务的实际执行者,它们监听消息中间件上的任务队列,当有新的任务到达时,Worker会取出任务并执行。Worker可以部署在单个或多个服务器上,实现任务的分布式处理。Worker能够并行处理任务,提高系统处理能力。用户可以配置Worker的数量和运行Worker的机器,以达到最佳的性能和资源利用。

3. 任务结果存储(Result Backend)

Result Backend用于存储任务执行的结果,以便任务的发起方(生产者)可以查询任务执行的状态和获取结果。Celery 支持多种存储结果的方式,包括:

  • RabbitMQ / AMQP:直接使用消息中间件存储结果,但功能有限。
  • Redis:常用的选择,快速且灵活,适合存储短期结果。
  • Django ORMSQLAlchemy:将结果存储在关系数据库中,适合需要持久化存储的场景。
  • MemcachedMongoDB等其他数据存储服务。

安装

# celery 安装
pip install celery
# celery 监控 flower
pip install flower
pip install redis
pip install eventlet

简单任务

单任务

目录结构

#celery_task.py

import celery
import time

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
cel = celery.Celery('test', backend=backend, broker=broker)


@cel.task
def send_email(name):
    print("向%s发送邮件..." % name)
    time.sleep(5)
    print("向%s发送邮件完成" % name)
    return "ok"
#product.py
from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id)

 接下来这一步很关键

#启动命令,进入该文件的目录下启动
celery --app=celery_task worker -P eventlet -l INFO

命令启动后,到product.py文件下运行代码,效果如下:

多任务

目录结构

#celery.py
from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_tasks.task01',
                      'celery_tasks.task02'
                      ])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
#task01
import time
from celery_tasks.celery import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return "完成向%s发送邮件任务"%res
#task02
import time
from celery_tasks.celery import cel

@cel.task
def send_msg(res):
    time.sleep(5)
    return "完成向%s发送短信任务"%res
#product.py
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg

# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('yuan')
print(result.id)
result = send_msg.delay('yuan')
print(result.id)

进入终端,在celery_tasks所在目录运行celery --app=celery_tasks worker -P eventlet -l INFO

 

定时任务

单任务

目录结构

#celery_task.py
import celery
import time
backend='redis://127.0.0.1:6379/1'  #结果存储
broker='redis://127.0.0.1:6379/2'  #消息中间件
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):
    print("向%s发送邮件..."%name)
    time.sleep(5)
    print("向%s发送邮件完成"%name)
    return "ok"

#product.py
from celery_task import send_email
from datetime import datetime

# 方式一
v1 = datetime(2020, 3, 11, 16, 19, 00)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = send_email.apply_async(args=["python", ], eta=v2)
print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta

time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay  # 当前时间加10秒

# 使用apply_async并设定时间
result = send_email.apply_async(args=["Golang"], eta=task_time)
print(result.id)

在终端执行命令 

celery --app=celery_task worker -P eventlet -l INFO
celery -A celery_task beat# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列

 执行完命令之后还要去运行product.py的代码 

多任务

目录结构

只需修改celery.py代码

#celery.py
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_tasks.task01',
    'celery_tasks.task02',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_tasks.task01.send_email',
        # 每隔2秒执行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=6),
        # 传递参数
        'args': ('你好',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_tasks.task01.send_email',
    #     每年4月11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': ('你好',)
    # },
}

在终端执行命令(不用执行product.py):

#分两个终端执行
celery --app=celery_tasks worker -P eventlet -l INFO
celery -A celery_tasks beat# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列

任务定时执行: