在Python 3中安排重复事件

时间:2021-08-21 14:32:48

I'm trying to schedule a repeating event to run every minute in Python 3.

我正在尝试安排重复事件在Python 3中每分钟运行一次。

I've seen class sched.scheduler but I'm wondering if there's another way to do it. I've heard mentions I could use multiple threads for this, which I wouldn't mind doing.

我见过类sched.scheduler,但我想知道是否有另一种方法可以做到这一点。我听说提到我可以使用多个线程,我不介意这样做。

I'm basically requesting some JSON and then parsing it; its value changes over time.

我基本上要求一些JSON然后解析它;它的价值随时间而变化。

To use sched.scheduler I have to create a loop to request it to schedule the even to run for one hour:

要使用sched.scheduler,我必须创建一个循环来请求它安排even运行一小时:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

What other ways to do this are there?

有什么其他方法可以做到这一点?

10 个解决方案

#1


35  

You could use threading.Timer, but that also schedules a one-off event, similarly to the .enter method of scheduler objects.

您可以使用threading.Timer,但它也会调度一次性事件,类似于调度程序对象的.enter方法。

The normal pattern (in any language) to transform a one-off scheduler into a periodic scheduler is to have each event re-schedule itself at the specified interval. For example, with sched, I would not use a loop like you're doing, but rather something like:

将一次性调度程序转换为周期性调度程序的正常模式(以任何语言)是使每个事件以指定的间隔重新调度自身。例如,使用sched,我不会像你一样使用循环,而是像:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

and initiate the whole "forever periodic schedule" with a call

并通过电话启动整个“永久定期时间表”

periodic(scheduler, 3600, query_rate_limit)

Or, I could use threading.Timer instead of scheduler.enter, but the pattern's quite similar.

或者,我可以使用threading.Timer而不是scheduler.enter,但模式非常相似。

If you need a more refined variation (e.g., stop the periodic rescheduling at a given time or upon certain conditions), that's not too hard to accomodate with a few extra parameters.

如果您需要更精细的变体(例如,在给定时间或某些条件下停止定期重新安排),那么使用一些额外参数就不会太难。

#2


18  

My humble take on the subject:

我对这个问题的谦虚态度:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Features:

  • Standard library only, no external dependencies
  • 仅限标准库,无外部依赖项

  • Uses the pattern suggested by Alex Martnelli
  • 使用Alex Martnelli建议的模式

  • start() and stop() are safe to call multiple times even if the timer has already started/stopped
  • 即使计时器已经启动/停止,start()和stop()也可以安全地多次调用

  • function to be called can have positional and named arguments
  • 要调用的函数可以有位置和命名参数

  • You can change interval anytime, it will be effective after next run. Same for args, kwargs and even function!
  • 您可以随时更改间隔,它将在下次运行后生效。 args,kwargs甚至功能相同!

#3


13  

You could use schedule. It works on Python 2.7 and 3.3 and is rather lightweight:

你可以使用时间表。它适用于Python 2.7和3.3,并且非常轻量级:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)

#4


6  

You could use the Advanced Python Scheduler. It even has a cron-like interface.

您可以使用Advanced Python Scheduler。它甚至有一个类似cron的界面。

#5


5  

Use Celery.

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something

#6


5  

Based on MestreLion answer, it solve a little problem with multithreading:

基于MestreLion的答案,它解决了多线程的一个小问题:

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()

#7


1  

Based on Alex Martelli's answer, I have implemented decorator version which is more easier to integrated.

基于Alex Martelli的回答,我已经实现了更容易集成的装饰器版本。

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')

#8


1  

Here's a quick and dirty non-blocking loop with Thread:

这是一个使用Thread的快速而脏的非阻塞循环:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

There's nothing particularly special, the worker creates a new thread of itself with a delay. Might not be most efficient, but simple enough. northtree's answer would be the way to go if you need more sophisticated solution.

没有什么特别的,工人创造了一个延迟的新线程。可能不是最有效率,但足够简单。如果您需要更复杂的解决方案,northtree的答案将是您的选择。

And based on this, we can do the same, just with Timer:

基于此,我们可以做同样的事情,只需使用Timer:

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())

#9


0  

See my sample

看我的样本

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))

#10


-2  

task-scheduler is an in-process scheduler to arrange and run the task periodically according to YAML config file. Please refer to https://github.com/tzutalin/task-scheduler

task-scheduler是一个进程内调度程序,用于根据YAML配置文件定期安排和运行任务。请参阅https://github.com/tzutalin/task-scheduler

#1


35  

You could use threading.Timer, but that also schedules a one-off event, similarly to the .enter method of scheduler objects.

您可以使用threading.Timer,但它也会调度一次性事件,类似于调度程序对象的.enter方法。

The normal pattern (in any language) to transform a one-off scheduler into a periodic scheduler is to have each event re-schedule itself at the specified interval. For example, with sched, I would not use a loop like you're doing, but rather something like:

将一次性调度程序转换为周期性调度程序的正常模式(以任何语言)是使每个事件以指定的间隔重新调度自身。例如,使用sched,我不会像你一样使用循环,而是像:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

and initiate the whole "forever periodic schedule" with a call

并通过电话启动整个“永久定期时间表”

periodic(scheduler, 3600, query_rate_limit)

Or, I could use threading.Timer instead of scheduler.enter, but the pattern's quite similar.

或者,我可以使用threading.Timer而不是scheduler.enter,但模式非常相似。

If you need a more refined variation (e.g., stop the periodic rescheduling at a given time or upon certain conditions), that's not too hard to accomodate with a few extra parameters.

如果您需要更精细的变体(例如,在给定时间或某些条件下停止定期重新安排),那么使用一些额外参数就不会太难。

#2


18  

My humble take on the subject:

我对这个问题的谦虚态度:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Features:

  • Standard library only, no external dependencies
  • 仅限标准库,无外部依赖项

  • Uses the pattern suggested by Alex Martnelli
  • 使用Alex Martnelli建议的模式

  • start() and stop() are safe to call multiple times even if the timer has already started/stopped
  • 即使计时器已经启动/停止,start()和stop()也可以安全地多次调用

  • function to be called can have positional and named arguments
  • 要调用的函数可以有位置和命名参数

  • You can change interval anytime, it will be effective after next run. Same for args, kwargs and even function!
  • 您可以随时更改间隔,它将在下次运行后生效。 args,kwargs甚至功能相同!

#3


13  

You could use schedule. It works on Python 2.7 and 3.3 and is rather lightweight:

你可以使用时间表。它适用于Python 2.7和3.3,并且非常轻量级:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)

#4


6  

You could use the Advanced Python Scheduler. It even has a cron-like interface.

您可以使用Advanced Python Scheduler。它甚至有一个类似cron的界面。

#5


5  

Use Celery.

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something

#6


5  

Based on MestreLion answer, it solve a little problem with multithreading:

基于MestreLion的答案,它解决了多线程的一个小问题:

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()

#7


1  

Based on Alex Martelli's answer, I have implemented decorator version which is more easier to integrated.

基于Alex Martelli的回答,我已经实现了更容易集成的装饰器版本。

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')

#8


1  

Here's a quick and dirty non-blocking loop with Thread:

这是一个使用Thread的快速而脏的非阻塞循环:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

There's nothing particularly special, the worker creates a new thread of itself with a delay. Might not be most efficient, but simple enough. northtree's answer would be the way to go if you need more sophisticated solution.

没有什么特别的,工人创造了一个延迟的新线程。可能不是最有效率,但足够简单。如果您需要更复杂的解决方案,northtree的答案将是您的选择。

And based on this, we can do the same, just with Timer:

基于此,我们可以做同样的事情,只需使用Timer:

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())

#9


0  

See my sample

看我的样本

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))

#10


-2  

task-scheduler is an in-process scheduler to arrange and run the task periodically according to YAML config file. Please refer to https://github.com/tzutalin/task-scheduler

task-scheduler是一个进程内调度程序,用于根据YAML配置文件定期安排和运行任务。请参阅https://github.com/tzutalin/task-scheduler