本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:
服务端使用asyncore, 收到文件后保存到本地。
客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。
重点:
1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。
2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。
上代码:
服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
# receive file from client and store them into file use asyncore.#
#/usr/bin/python
#coding: utf-8
import asyncore
import socket
from socket import errno
import logging
import time
import sys
import struct
import os
import fcntl
import threading
from rrd_graph import MakeGraph
try :
import rrdtool
except (ImportError, ImportWarnning):
print "Hope this information can help you:"
print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu."
sys.exit( 1 )
class RequestHandler(asyncore.dispatcher):
def __init__( self , sock, map = None , chunk_size = 1024 ):
self .logger = logging.getLogger( '%s-%s' % ( self .__class__.__name__, str (sock.getsockname())))
self .chunk_size = chunk_size
asyncore.dispatcher.__init__( self ,sock, map )
self .data_to_write = list ()
def readable( self ):
#self.logger.debug("readable() called.")
return True
def writable( self ):
response = ( not self .connected) or len ( self .data_to_write)
#self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write)))
return response
def handle_write( self ):
data = self .data_to_write.pop()
#self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data))
sent = self .send(data[: self .chunk_size])
if sent < len (data):
remaining = data[sent:]
self .data_to_write.append(remaining)
def handle_read( self ):
self .writen_size = 0
nagios_perfdata = '../perfdata'
head_packet_format = "!LL128s128sL"
head_packet_size = struct.calcsize(head_packet_format)
data = self .recv(head_packet_size)
if not data:
return
filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data)
filepath = os.path.join(nagios_perfdata, filepath[:filepath_len])
filename = filename[:filename_len]
self .logger.debug( "update file: %s" % filepath + '/' + filename)
try :
if not os.path.exists(filepath):
os.makedirs(filepath)
except OSError:
pass
self .fd = open (os.path.join(filepath,filename), 'w' )
#self.fd = open(filename,'w')
if filesize > self .chunk_size:
times = filesize / self .chunk_size
first_part_size = times * self .chunk_size
second_part_size = filesize % self .chunk_size
while 1 :
try :
data = self .recv( self .chunk_size)
#self.logger.debug("handle_read()->%s size.",len(data))
except socket.error,e:
if e.args[ 0 ] = = errno.EWOULDBLOCK:
print "EWOULDBLOCK"
time.sleep( 1 )
else :
#self.logger.debug("Error happend while receive data: %s" % e)
break
else :
self .fd.write(data)
self .fd.flush()
self .writen_size + = len (data)
if self .writen_size = = first_part_size:
break
#receive the packet at last
while 1 :
try :
data = self .recv(second_part_size)
#self.logger.debug("handle_read()->%s size.",len(data))
except socket.error,e:
if e.args[ 0 ] = = errno.EWOULDBLOCK:
print "EWOULDBLOCK"
time.sleep( 1 )
else :
#self.logger.debug("Error happend while receive data: %s" % e)
break
else :
self .fd.write(data)
self .fd.flush()
self .writen_size + = len (data)
if len (data) = = second_part_size:
break
elif filesize < = self .chunk_size:
while 1 :
try :
data = self .recv(filesize)
#self.logger.debug("handle_read()->%s size.",len(data))
except socket.error,e:
if e.args[ 0 ] = = errno.EWOULDBLOCK:
print "EWOULDBLOCK"
time.sleep( 1 )
else :
#self.logger.debug("Error happend while receive data: %s" % e)
break
else :
self .fd.write(data)
self .fd.flush()
self .writen_size + = len (data)
if len (data) = = filesize:
break
self .logger.debug( "File size: %s" % self .writen_size)
class SyncServer(asyncore.dispatcher):
def __init__( self ,host,port):
asyncore.dispatcher.__init__( self )
self .debug = True
self .logger = logging.getLogger( self .__class__.__name__)
self .create_socket(socket.AF_INET,socket.SOCK_STREAM)
self .set_reuse_addr()
self .bind((host,port))
self .listen( 2000 )
def handle_accept( self ):
client_socket = self .accept()
if client_socket is None :
pass
else :
sock, addr = client_socket
#self.logger.debug("Incoming connection from %s" % repr(addr))
handler = RequestHandler(sock = sock)
class RunServer(threading.Thread):
def __init__( self ):
super (RunServer, self ).__init__()
self .daemon = False
def run( self ):
server = SyncServer('', 9999 )
asyncore.loop(use_poll = True )
def StartServer():
logging.basicConfig(level = logging.DEBUG,
format = '%(name)s: %(message)s' ,
)
RunServer().start()
#MakeGraph().start()
if __name__ = = '__main__' :
StartServer()
|
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# monitor path with inotify(python module), and send them to remote server.#
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.#
import socket
import time
import os
import sys
import struct
import threading
import Queue
try :
import pyinotify
except (ImportError, ImportWarnning):
print "Hope this information can help you:"
print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu."
sys.exit( 1 )
try :
from sendfile import sendfile
except (ImportError,ImportWarnning):
pass
filetype_filter = [ ".rrd" , ".xml" ]
def check_filetype(pathname):
for suffix_name in filetype_filter:
if pathname[ - 4 :] = = suffix_name:
return True
try :
end_string = pathname.rsplit( '.' )[ - 1 :][ 0 ]
end_int = int (end_string)
except :
pass
else :
# means pathname endwith digit
return False
class sync_file(threading.Thread):
def __init__( self , addr, events_queue):
super (sync_file, self ).__init__()
self .daemon = False
self .queue = events_queue
self .addr = addr
self .chunk_size = 1024
def run( self ):
while 1 :
event = self .queue.get()
if check_filetype(event.pathname):
print time.asctime(),event.maskname, event.pathname
filepath = event.path.split( '/' )[ - 1 :][ 0 ]
filename = event.name
filesize = os.stat(os.path.join(event.path, filename)).st_size
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
filepath_len = len (filepath)
filename_len = len (filename)
sock.connect( self .addr)
offset = 0
data = struct.pack( "!LL128s128sL" ,filepath_len, filename_len, filepath,filename,filesize)
fd = open (event.pathname, 'rb' )
sock.sendall(data)
if "sendfile" in sys.modules:
# print "use sendfile(2)"
while 1 :
sent = sendfile(sock.fileno(), fd.fileno(), offset, self .chunk_size)
if sent = = 0 :
break
offset + = sent
else :
# print "use original send function"
while 1 :
data = fd.read( self .chunk_size)
if not data: break
sock.send(data)
sock.close()
fd.close()
class EventHandler(pyinotify.ProcessEvent):
def __init__( self , events_queue):
super (EventHandler, self ).__init__()
self .events_queue = events_queue
def my_init( self ):
pass
def process_IN_CLOSE_WRITE( self ,event):
self .events_queue.put(event)
def process_IN_MOVED_TO( self ,event):
self .events_queue.put(event)
def start_notify(path, mask, sync_server):
events_queue = Queue.Queue()
sync_thread_pool = list ()
for i in range ( 500 ):
sync_thread_pool.append(sync_file(sync_server, events_queue))
for i in sync_thread_pool:
i.start()
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm,EventHandler(events_queue))
wdd = wm.add_watch(path,mask,rec = True )
notifier.loop()
def do_notify():
perfdata_path = '/var/lib/pnp4nagios/perfdata'
mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
sync_server = ( '127.0.0.1' , 9999 )
start_notify(perfdata_path,mask,sync_server)
if __name__ = = '__main__' :
do_notify()
|
python监视线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
#!/usr/bin/python
import threading
import time
class Monitor(threading.Thread):
def __init__( self , * args, * * kwargs):
super (Monitor, self ).__init__()
self .daemon = False
self .args = args
self .kwargs = kwargs
self .pool_list = []
def run( self ):
print self .args
print self .kwargs
for name,value in self .kwargs.items():
obj = value[ 0 ]
temp = {}
temp[name] = obj
self .pool_list.append(temp)
while 1 :
print self .pool_list
for name,value in self .kwargs.items():
obj = value[ 0 ]
parameters = value[ 1 :]
died_threads = self .cal_died_thread( self .pool_list,name)
print "died_threads" , died_threads
if died_threads > 0 :
for i in range (died_threads):
print "start %s thread..." % name
t = obj[ 0 ].__class__( * parameters)
t.start()
self .add_to_pool_list(t,name)
else :
break
time.sleep( 0.5 )
def cal_died_thread( self ,pool_list,name):
i = 0
for item in self .pool_list:
for k,v in item.items():
if name = = k:
lists = v
for t in lists:
if not t.isAlive():
self .remove_from_pool_list(t)
i + = 1
return i
def add_to_pool_list( self ,obj,name):
for item in self .pool_list:
for k,v in item.items():
if name = = k:
v.append(obj)
def remove_from_pool_list( self , obj):
for item in self .pool_list:
for k,v in item.items():
try :
v.remove(obj)
except :
pass
else :
return
|
使用方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
rrds_queue = Queue.Queue()
make_rrds_pool = []
for i in range ( 5 ):
make_rrds_pool.append(MakeRrds(rrds_queue))
for i in make_rrds_pool:
i.start()
make_graph_pool = []
for i in range ( 5 ):
make_graph_pool.append(MakeGraph(rrds_queue))
for i in make_graph_pool:
i.start()
monitor = Monitor(make_rrds_pool = (make_rrds_pool, rrds_queue), \
make_graph_pool = (make_graph_pool, rrds_queue))
monitor.start()
|
解析:
1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。
从外部调用Django模块
1
2
3
4
5
6
7
8
|
import os
import sys
sys.path.insert( 0 , '/data/cloud_manage' )
from django.core.management import setup_environ
import settings
setup_environ(settings)
from common.monitor import Monitor
from django.db import connection, transaction
|
前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。
希望本文所述对大家的Python程序设计有所帮助。