timer
timer定时本质
threading.Timer源代码
class Timer(Thread):
"""Call a function after a specified number of seconds:
t = Timer(30.0, f, args=None, kwargs=None)
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet."""
self.finished.set()
def run(self):
# 等待后时间为 false
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
# 重新设置为 true
self.finished.set()
逻辑:继承threading.Timer本质上是一个线程。通过事件wait
等待interval秒后,判定事件标志是否true,否则执行定时任务。最后,将时间设置为true。本质上是通过线程事件阻塞等待然后执行。也就是说只能执行一次。
改造为周期任务
默认想到的while True
。python生态中没有很多正在维护的包。一般都是一次性任务,周期性任务没有。
import threading
import time
cancel_tmr = False
def start():
# 具体任务执行内容
print("hello world")
def timer():
while True:
start()
time.sleep(1)
if __name__ == '__main__':
timer()
逻辑i是time睡多少秒,这个是有一定问题的。start
任务其实也是有耗时。总体算下来不是一个准确的周期性定时任务。
通过timer改造
from threading import Thread, Timer
class ReadThread(Thread):
def __init__(self, ip: str, port: int, slaves: [tuple, list], cts: dict):
super().__init__()
self.ip = ip
self.port = port
self.salves = slaves
self.cts = cts
self.conn = Connect(ip=ip, port=self.port)
def run(self):
print('执行')
def main():
Timer(5, main).start()
for ip, port, addr, ct in config.HOST_PORT_ADDR_CT:
thread = ReadThread(
ip=ip,
port=port,
slaves=addr,
cts=ct
)
thread.start()
if __name__ == '__main__':
main()
通过threading.Timer自己调用自己达到定时任务的效果。从功能实现上是没有问题的。但是,在多线程中就会遇到问题,会造成资源上的问题。具体还需要验证。通过print(threading.current_thread().getName())
每次定时任务后线程名称是递增的一个现象。每次,都需要创建一个新的线程,如果间隔比较短的情况下。可能会造成一些问题,具体问题待验证。
未来
cron倾向于从linux方向去执行定时任务,而非python方向去走。django-crontab也是很久没有维护了。python-crontab可以作为一个尝试。