本文实例讲述了Python模拟简单电梯调度算法。分享给大家供大家参考,具体如下:
经常在公司坐电梯,由于楼层较高,是双联装的电梯,但是经常等电梯很久,经常有人骂写电梯调度算法的。回来闲来无事,自己尝试写了一个简单的。
场景很简单,每一层电梯口只有一个按钮,不区分上下,当有人按下这个键后,电梯会过来停在此层,这个人可以进去,并选择自己想去的层。电梯的调度策略也很简单,在一次向上的过程中,如果有人在下面按了键,电梯并不直接向下,而是运行到此次向上的最顶层,然后再下次向下运行的过程中去服务这个请求。
elevator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import time
from myque import myque
class elevator:
def __init__( self ,layers):
self .building_layers = layers
self .direction = 'up'
self .cur_layer = 1
self .up_queue = myque()
self .down_queue = myque( True )
self .switcher = 'open'
def stop( self ):
self .switcher = 'stop'
def push_button( self ,layer,direction = None ):
if self .cur_layer>layer:
self .down_queue.insert(layer)
elif self .cur_layer<layer:
self .up_queue.insert(layer)
else :
if self .direction = = 'up' :
self .down_queue.insert(layer)
else :
self .up_queue.insert(layer)
def handle_queue( self ,direction):
self .direction = direction
if direction = = 'up' :
inc = 1
else :
inc = - 1
que = getattr ( self , direction + '_queue' )
while que.length():
while self .cur_layer ! = que.front():
print '/nelevator in ' , self .cur_layer
time.sleep( 1 )
self .cur_layer + = inc
print '/nelevator arrives at ' , self .cur_layer
que.pop_front()
def run( self ):
while self .switcher = = 'open' :
if self .up_queue.empty() and self .down_queue.empty():
"""elevator now is waiting, stop at a layer"""
time.sleep( 1 )
continue
"""go up"""
self .handle_queue( 'up' )
"""go down"""
self .handle_queue( 'down' )
|
myque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import threading
class myque:
def __init__( self ,reverse = False ):
self .mode = reverse
self .buf = []
self .lock = threading.Lock()
def insert( self , object ):
self .lock.acquire()
self .buf.append( object )
self .buf.sort(reverse = self .mode)
self .lock.release()
def front( self ):
return self .buf[ 0 ]
def pop_front( self ):
self .lock.acquire()
self .buf.pop( 0 )
self .lock.release()
def length( self ):
self .lock.acquire()
size = len ( self .buf)
self .lock.release()
return size
def empty( self ):
self .lock.acquire()
size = len ( self .buf)
self .lock.release()
return size = = 0
|
deploy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import threading
from elevator import elevator
def init_elevator(building_layers):
e = elevator(building_layers)
t = threading.Thread(target = e.run)
t.setDaemon( True )
t.start()
return (e,t)
def main():
myelevator,ctl_thread = init_elevator( 17 )
while True :
str = raw_input ( "Input valid layer :" )
try :
layer = int ( str )
except Exception:
if str = = 'quit' :
myelevator.stop()
ctl_thread.join()
break
else :
print 'invalid input' , str
continue
if layer not in range ( 1 ,myelevator.building_layers + 1 ):
continue
myelevator.push_button(layer)
if __name__ = = '__main__' :
main()
|
运行结果如下:
如果扩展的话,很容易将各层的按钮扩展为带上下指示的。如果有机会可以扩展为多联装电梯,并将调度算法做的更加智能,可以根据历史数据和时间进行动态调整。
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://blog.csdn.net/zzulp/article/details/5681482