1.使用多个进程读取同一个文件,每次取出文件中的一行,并共同写到某个文件中去
import multiprocessing
import time
FILE_SOURCE = 'f:\source.txt'
FILE_TARGET = 'f:\\target.txt'
ID_LIST = range(1, 999)
def create_source():
f = open(FILE_SOURCE, 'w')
s = ''
for i in ID_LIST:
s += str(i) + ' ' + str(10 + float(i)/1000) + '\n'
f.write(s)
f.close()
def get_info_from_file(file_path, id_in):
f = open(file_path, 'r')
all_info = {}
lines = f.readlines()
f.close()
for line in lines:
item = line.split(' ')
all_info[item[0]] = line
index = str(id_in)
if index in all_info:
return all_info[index]
return None
def write_info_to_target(file_path, info):
f = open(file_path, 'a')
f.write(info)
f.close()
class MyTask(object):
def __init__(self, id_in):
self.id = id_in
def worker(task_queue, name):
while True:
next_task = task_queue.get()
if next_task is None:
task_queue.task_done()
break
id_in = next_task.id
print name, id_in
write_info_to_target(FILE_TARGET, get_info_from_file(FILE_SOURCE, id_in))
#模拟进程代码运行过程
time.sleep(0.05)
task_queue.task_done()
print "worker exit"
return
def multiprocessing_file_operation():
task_queue = multiprocessing.JoinableQueue()
print "start", time.clock()
#create task
job_num = multiprocessing.cpu_count()*2
jobs = [multiprocessing.Process(target=worker, args=(task_queue, 'Process %d' % i)) for i in xrange(job_num)]
for j in jobs:
j.start()
for i in ID_LIST:
task_queue.put(MyTask(i))
#下面的操作是必须的,否则进程无法从queue中获取None对象,进程就一直死循环,挂死
for j in xrange(job_num):
task_queue.put(None)
task_queue.join()
print "end", time.clock()
if __name__=="__main__":
multiprocessing_file_operation()
运行时间上:
CPU是个双核
如果是单进程:为53s
如果起4个进程:为14s
8个进程:为7s
16个进程:为4s