I have a script that includes opening a file from a list and then doing something to the text within that file. I'm using python multiprocessing and Pool to try to parallelize this operation. A abstraction of the script is below:
我有一个脚本,其中包括从列表中打开一个文件,然后对该文件中的文本做一些操作。我正在使用python多处理和池来并行化这个操作。该脚本的抽象如下:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
When I run this the print out of the process id is the same for each iteration. Basically what I'm trying to do is take each element of the input list and fork it out to a separate process, but it seems like one process is doing all of the work.
当我运行它时,每个迭代的进程id输出都是相同的。基本上,我要做的就是把输入列表中的每个元素分叉到一个单独的进程中,但是似乎一个进程正在做所有的工作。
2 个解决方案
#1
27
-
apply_async
farms out one task to the pool. You would need to callapply_async
many times to exercise more processors. - apply_async将一个任务分配给池。您需要多次调用apply_async来运行更多的处理器。
- Don't allow both processes to try to write to the same list,
results
. Since the pool workers are separate processes, the two won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or useapply_async
's callback to setup the Queue for you.apply_async
will call the callback once the function completes. - 不要让这两个进程试图写入相同的列表,结果。由于池工作人员是独立的进程,所以这两个进程不会写入相同的列表。解决这个问题的一种方法是使用ouput队列。您可以自己设置它,或者使用apply_async的回调为您设置队列。函数完成后,apply_async将调用回调。
- You could use
map_async
instead ofapply_async
, but then you'd get a list of lists, which you'd then have to flatten. - 您可以使用map_async而不是apply_async,但是您会得到一个列表列表列表,然后您必须将其展开。
So, perhaps try instead something like:
所以,也许你可以试试以下方法:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
#2
7
Maybe in this case you should use map_async
:
也许在这种情况下,您应该使用map_async:
import os
from multiprocessing import Pool
results = []
def testFunc(file):
message = ("Working in Process #%d" % (os.getpid()))
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
return message
if __name__=="__main__":
print("saddsf")
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.map_async(testFunc, files)
print(results.get())
#1
27
-
apply_async
farms out one task to the pool. You would need to callapply_async
many times to exercise more processors. - apply_async将一个任务分配给池。您需要多次调用apply_async来运行更多的处理器。
- Don't allow both processes to try to write to the same list,
results
. Since the pool workers are separate processes, the two won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or useapply_async
's callback to setup the Queue for you.apply_async
will call the callback once the function completes. - 不要让这两个进程试图写入相同的列表,结果。由于池工作人员是独立的进程,所以这两个进程不会写入相同的列表。解决这个问题的一种方法是使用ouput队列。您可以自己设置它,或者使用apply_async的回调为您设置队列。函数完成后,apply_async将调用回调。
- You could use
map_async
instead ofapply_async
, but then you'd get a list of lists, which you'd then have to flatten. - 您可以使用map_async而不是apply_async,但是您会得到一个列表列表列表,然后您必须将其展开。
So, perhaps try instead something like:
所以,也许你可以试试以下方法:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
#2
7
Maybe in this case you should use map_async
:
也许在这种情况下,您应该使用map_async:
import os
from multiprocessing import Pool
results = []
def testFunc(file):
message = ("Working in Process #%d" % (os.getpid()))
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
return message
if __name__=="__main__":
print("saddsf")
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.map_async(testFunc, files)
print(results.get())