Django—每x秒运行一个函数

时间:2022-09-04 01:07:14

I'm working on a Django app. I have an API endpoint, which if requested, must carry out a function that must be repeated a few times (until a certain condition is true). How I'm dealing with it right now is -

我正在开发Django应用程序。我有一个API端点,如果需要,它必须执行一个必须重复几次的函数(直到某个条件为真)。我现在该怎么处理呢

def shut_down(request):
  # Do some stuff
  while True:
    result = some_fn()
    if result:
      break
    time.sleep(2)

  return True

While I know that this is a terrible approach and that I shouldn't be blocking for 2 seconds, I can't figure out how to get around it.
This works, after say a wait of 4 seconds. But I'd like something that keeps the loop running in the background, and stop once some_fn returns True. (Also, it is certain that some_fn will return True)

EDIT -
Reading Oz123's response gave me an idea which seems to work. Here's what I did -

虽然我知道这是一种糟糕的方法,我不应该阻塞2秒,但我不知道如何绕过它。这是可行的,在等了4秒之后。但是我想要一些东西来保持循环在后台运行,并在some_fn返回True时停止。(而且,可以肯定的是,some_fn将返回True)编辑——阅读Oz123的回复给了我一个似乎行得通的想法。这是我所做的

def shut_down(params):
    # Do some stuff
    # Offload the blocking job to a new thread

    t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
    t.setDaemon(True)
    t.start()

    return True

def some_fn(id):
    while True:
        # Do the job, get result in res
        # If the job is done, return. Or sleep the thread for 2 seconds before trying again.

        if res:
            return
        else:
            time.sleep(2)

This does the job for me. It's simple but I don't know how efficient multithreading is in conjunction with Django.
If anyone can point out pitfalls of this, criticism is appreciated.

这对我很合适。它很简单,但是我不知道与Django结合使用多线程有多有效。如果有人能指出其中的缺陷,批评是值得赞赏的。

2 个解决方案

#1


8  

For many small projects celery is overkill. For those projects you can use schedule, it's very easy to use.

对许多小项目来说,芹菜是过量的。对于那些你可以使用日程安排的项目,它非常容易使用。

With this library you can make any function execute a task periodically:

使用这个库,您可以使任何函数定期执行任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1) 

The example runs in a blocking manner, but if you look in the FAQ, you will find that you can also run tasks in a parallel thread, such that you are not blocking, and remove the task once not needed anymore:

这个示例以一种阻塞的方式运行,但是如果您查看FAQ,您会发现您也可以在一个并行线程中运行任务,这样您就不会阻塞了,并且一旦不再需要该任务,就删除该任务:

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

Here is an example for usage in a class method:

下面是一个在类方法中使用的示例:

    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds

    ...
    # later on foo is not needed any more:
    self._job_stop.set()

    ...

    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()

#2


1  

I recommend you use Celery's task management. You can refer this to set up this app (package if you're from javaScript background).

我建议你使用芹菜的任务管理。您可以将此引用来设置此应用程序(如果您来自javaScript后台,请使用包)。

Once set, you can alter the code to:

一旦设置,您可以将代码更改为:

@app.task
def check_shut_down():
    if not some_fun():
        # add task that'll run again after 2 secs
        check_shut_down.delay((), countdown=3)
    else:
        # task completed; do something to notify yourself
        return True

#1


8  

For many small projects celery is overkill. For those projects you can use schedule, it's very easy to use.

对许多小项目来说,芹菜是过量的。对于那些你可以使用日程安排的项目,它非常容易使用。

With this library you can make any function execute a task periodically:

使用这个库,您可以使任何函数定期执行任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1) 

The example runs in a blocking manner, but if you look in the FAQ, you will find that you can also run tasks in a parallel thread, such that you are not blocking, and remove the task once not needed anymore:

这个示例以一种阻塞的方式运行,但是如果您查看FAQ,您会发现您也可以在一个并行线程中运行任务,这样您就不会阻塞了,并且一旦不再需要该任务,就删除该任务:

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

Here is an example for usage in a class method:

下面是一个在类方法中使用的示例:

    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds

    ...
    # later on foo is not needed any more:
    self._job_stop.set()

    ...

    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()

#2


1  

I recommend you use Celery's task management. You can refer this to set up this app (package if you're from javaScript background).

我建议你使用芹菜的任务管理。您可以将此引用来设置此应用程序(如果您来自javaScript后台,请使用包)。

Once set, you can alter the code to:

一旦设置,您可以将代码更改为:

@app.task
def check_shut_down():
    if not some_fun():
        # add task that'll run again after 2 secs
        check_shut_down.delay((), countdown=3)
    else:
        # task completed; do something to notify yourself
        return True