本人在做项目的时候遇到一个问题:
某个函数需要在每个小时的 3 分钟时候被执行一次,我希望我 15:45 启动程序,过了18 分钟在 16:03 这个函数被执行一次,下一次过 60 分钟在 17:03 再次被执行,下一次 18:03,以此类推。
以下是我基于 Timer 做的再封装实现了此功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# -*- coding: utf-8 -*-
# ==================================================
# 对 Timer 做以下再封装的目的是:当某个功能需要每隔一段时间被
# 执行一次的时候,不需要在回调函数里对 Timer 做重新安装启动
# ==================================================
__author__ = 'liujiaxing'
from threading import Timer
from datetime import datetime
class MyTimer( object ):
def __init__( self , start_time, interval, callback_proc, args = None , kwargs = None ):
self .__timer = None
self .__start_time = start_time
self .__interval = interval
self .__callback_pro = callback_proc
self .__args = args if args is not None else []
self .__kwargs = kwargs if kwargs is not None else {}
def exec_callback( self , args = None , kwargs = None ):
self .__callback_pro( * self .__args, * * self .__kwargs )
self .__timer = Timer( self .__interval, self .exec_callback )
self .__timer.start()
def start( self ):
interval = self .__interval - ( datetime.now().timestamp() - self .__start_time.timestamp() )
print ( interval )
self .__timer = Timer( interval, self .exec_callback )
self .__timer.start()
def cancel( self ):
self .__timer.cancel()
self .__timer = None
class AA:
def hello( self , name, age ):
print ( "[%s]\thello %s: %d\n" % ( datetime.now().strftime( "%Y%m%d %H:%M:%S" ), name, age ) )
if __name__ = = "__main__" :
aa = AA()
start = datetime.now().replace( minute = 3 , second = 0 , microsecond = 0 )
tmr = MyTimer( start, 60 * 60 , aa.hello, [ "owenliu" , 18 ] )
tmr.start()
tmr.cancel()
|
以上这篇Python 实现某个功能每隔一段时间被执行一次的功能方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u010649766/article/details/79446798