并行处理大量任务

时间:2022-09-17 16:58:17

I have 10,000 csv files for which I have to open in Pandas and manipulate/transform using some of Pandas's function and save the new output to csv. Could I use a parallel process (for Windows) to make the work faster? I tried the following but no luck:

我有10,000个csv文件,我必须在Pandas中打开并使用一些Pandas的函数进行操作/转换,并将新输出保存到csv。我可以使用并行进程(对于Windows)来加快工作速度吗?我试过以下但没有运气:

import pandas pd
import multiprocessing

def proc_file(file):
    df = pd.read_csv(file)
    df = df.reample('1S', how='sum')
    df.to_csv('C:\\newfile.csv')
if __name__ == '__main__':    
    files = ['C:\\file1.csv', ... 'C:\\file2.csv']

    for i in files:
        p = multiprocessing.Process(target=proc_file(i))
    p.start() 

I don't think I have a good understanding of multiprocessing in Python.

我不认为我对Python中的多处理有很好的理解。

2 个解决方案

#1


Make sure to close the pool later too:

确保以后关闭池:

import multiprocessing

# Maximum number of cpus to use at a time
max_threads = multiprocessing.cpu_count() - 1

pool = multiprocessing.Pool(max_threads)
list_files = pool.map(func,list_of_csvs)
pool.close()
pool.join()

list_files can contain a list e.g. you could return the name of the altered csv from func()

list_files可以包含一个列表,例如你可以从func()返回改变的csv的名字

#2


Maybe something like this:

也许是这样的:

p = multiprocessing.Pool()
p.map(prof_file, files)

For this size, you really need a process pool, so that the cost of launching a process is offset by the work it does. multiprocessing.Pool does exactly that: it transforms task parallelism (which is what you were doing) into task parallelism.

对于这个大小,您确实需要一个进程池,因此启动进程的成本会被它所做的工作所抵消。 multiprocessing.Pool就是这样做的:它将任务并行性(你正在做的事情)转换为任务并行性。

#1


Make sure to close the pool later too:

确保以后关闭池:

import multiprocessing

# Maximum number of cpus to use at a time
max_threads = multiprocessing.cpu_count() - 1

pool = multiprocessing.Pool(max_threads)
list_files = pool.map(func,list_of_csvs)
pool.close()
pool.join()

list_files can contain a list e.g. you could return the name of the altered csv from func()

list_files可以包含一个列表,例如你可以从func()返回改变的csv的名字

#2


Maybe something like this:

也许是这样的:

p = multiprocessing.Pool()
p.map(prof_file, files)

For this size, you really need a process pool, so that the cost of launching a process is offset by the work it does. multiprocessing.Pool does exactly that: it transforms task parallelism (which is what you were doing) into task parallelism.

对于这个大小,您确实需要一个进程池,因此启动进程的成本会被它所做的工作所抵消。 multiprocessing.Pool就是这样做的:它将任务并行性(你正在做的事情)转换为任务并行性。