Python 在不依赖第三方库的前提下,对于定时器的实现并不是很完美,但是这不意味着我们无法实现。
阅读了网上的一些资料,得出一些结论,顺手写了一个基类的定时器(Python3)
BaseTimer:
1 # -*- coding: utf-8 -*-
2
3
4 from abc import ABCMeta, abstractmethod
5 import time
6 import threading
7
8 class BaseTimer(threading.Thread):
9 """
10 基础定时器抽象类,可以随意继承本类并且实现exec抽象方法即可定时产生任务
11 """
12 __metaclass__ = ABCMeta
13 def __init__(self,howtime=1.0,enduring=True):
14 """
15 howtime 每次定时的时间
16 enduring 是否是一个持久性的任务,用这个可以控制开关
17 """
18
19 self.enduring = enduring
20 self.howtime = howtime
21 threading.Thread.__init__(self)
22
23 def run(self):
24 time.sleep(self.howtime) #至少执行一次 任务
25 self.exec()
26 while self.enduring: #是否持久,或者是否进行运行
27 time.sleep(self.howtime)
28 self.exec() #每次开始执行任务
29
30 @abstractmethod
31 def exec(self):
32 """抽象方法,子类实现"""
33 pass
34
35 def destroy(self):
36 """销毁自身"""
37 self.enduring = False
38 del self
39
40 def stop(self):
41 self.enduring = False
42
43 def restart(self):
44 self.enduring = True
45
46 def get_status(self):
47 return self.enduring
48
如何使用?
我们来建立一个新的任务,这个任务是过一段时间就输出:
1 class TimerMask(BaseTimer):
2 """定时任务类,不同的业务逻辑"""
3 def __init__(self,howtime=1.0,enduring=True):
4 BaseTimer.__init__(self,howtime,enduring)
5 self.ws=0
6
7 def exec(self):
8 print("HelloWorld!")
9 self.ws = self.ws + 1 #这里是过0.8秒输出一次
10 if self.ws >5:
11 self.destroy()
加入如下:
1 if __name__ == "__main__":
2 Q_w = 0
3 w = TimerMask(howtime=0.8)
4 print("-")
5 w.start()
6 #这里线程输出这些,做其他事情的
7 while True:
8 time.sleep(0.4) #0.4秒
9 print("- ",Q_w," - WMask:",w)
10 Q_w += 1
11 pass
输出:
于是你可能会想问,那岂不是每个不同的行为都要写一个继承了BaseTimer的类来做事呢,其实不然,你可以写个函数调用的TimerFunc类:
1 class TimerFunc(BaseTimer):
2 """可传递任何函数的定时任务类"""
3 def __init__(self,func,howtime=1.0,enduring=True):
4 BaseTimer.__init__(self,howtime,enduring)
5 self.func = func
6
7 def exec(self):
8 self.func() #调用函数
9
10 #使用方法:
11 def doing():
12 print("Hello")
13
14 w = TimerFunc(doing)
15 w.start()
输出:"Hello",并且会每隔1秒执行一次
是不是觉得可以做一堆事情了?你可以*发挥,继承BaseTimer类
1 w = TimerFunc(doing,5,False) #这样就可以定义延迟5秒使用了~
2 w.start()
在搜索资料的时候,找到了网上大部分的实现方法,其实都差不多,感兴趣你可以看看:
1 import threading ,timeView Code
2 from time import sleep, ctime
3 class Timer(threading.Thread):
4 """
5 very simple but useless timer.
6 """
7 def __init__(self, seconds):
8 self.runTime = seconds
9 threading.Thread.__init__(self)
10 def run(self):
11 time.sleep(self.runTime)
12 print ("Buzzzz!! Time's up!")
13
14 class CountDownTimer(Timer):
15 """
16 a timer that can counts down the seconds.
17 """
18 def run(self):
19 counter = self.runTime
20 for sec in range(self.runTime):
21 print (counter)
22 time.sleep(1.0)
23 counter -= 1
24 print ("Done")
25
26 class CountDownExec(CountDownTimer):
27 """
28 a timer that execute an action at the end of the timer run.
29 """
30 def __init__(self, seconds, action, args=[]):
31 self.args = args
32 self.action = action
33 CountDownTimer.__init__(self, seconds)
34 def run(self):
35 CountDownTimer.run(self)
36 self.action(self.args)
37
38 def myAction(args=[]):
39 print ("Performing my action with args:")
40 print (args)
41
42 if __name__ == "__main__":
43 t = CountDownExec(3, myAction, ["hello", "world"])
44 t.start()
45 print("2333")
1 '''View Code
2 使用sched模块实现的timer,sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法
3 Created on 2013-7-31
4
5 @author: Eric
6 '''
7 import time, sched
8
9 #被调度触发的函数
10 def event_func(msg):
11 print("Current Time:", time.strftime("%y-%m-%d %H:%M:%S"), 'msg:', msg)
12
13 def run_function():
14 #初始化sched模块的scheduler类
15 s = sched.scheduler(time.time, time.sleep)
16 #设置一个调度,因为time.sleep()的时间是一秒,所以timer的间隔时间就是sleep的时间,加上enter的第一个参数
17 s.enter(0, 2, event_func, ("Timer event.",))
18 s.run()
19
20 def timer1():
21 while True:
22 #sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法
23 time.sleep(1)
24 run_function()
25
26 if __name__ == "__main__":
27 timer1()
感谢耐心阅读,希望对你有帮助。