在多处理进程之间共享大型只读Numpy数组

时间:2021-12-29 20:58:24

I have a 60GB SciPy Array (Matrix) I must share between 5+ multiprocessing Process objects. I've seen numpy-sharedmem and read this discussion on the SciPy list. There seem to be two approaches--numpy-sharedmem and using a multiprocessing.RawArray() and mapping NumPy dtypes to ctypes. Now, numpy-sharedmem seems to be the way to go, but I've yet to see a good reference example. I don't need any kind of locks, since the array (actually a matrix) will be read-only. Now, due to its size, I'd like to avoid a copy. It sounds like the correct method is to create the only copy of the array as a sharedmem array, and then pass it to the Process objects? A couple of specific questions:

我有一个60GB的SciPy数组(矩阵),我必须在5+多处理进程对象之间共享。我看到了numpy-sharedmem,并在SciPy列表上阅读了这个讨论。似乎有两种方法——NumPy sharedmem和使用multiprocess . rawarray()并将NumPy dtypes映射到ctypes。现在,numpy-sharedmem似乎是一条路,但是我还没有看到一个好的参考例子。我不需要任何类型的锁,因为数组(实际上是一个矩阵)将是只读的。现在,由于它的尺寸,我想避免复制。听起来正确的方法是将数组的唯一副本创建为sharedmem数组,然后将其传递给Process对象?几个具体的问题:

  1. What's the best way to actually pass the sharedmem handles to sub-Process()es? Do I need a queue just to pass one array around? Would a pipe be better? Can I just pass it as an argument to the Process() subclass's init (where I'm assuming it's pickled)?

    将sharedmem句柄传递给子进程()的最佳方式是什么?我需要一个队列来传递一个数组吗?管道会更好吗?我可以将它作为参数传递给Process()子类的init(我假设它是pickle的)吗?

  2. In the discussion I linked above, there's mention of numpy-sharedmem not being 64bit-safe? I'm definitely using some structures that aren't 32-bit addressable.

    在我上面链接的讨论中,有提到numpy-sharedmem不安全吗?我肯定使用了一些不是32位可寻址的结构。

  3. Are there tradeoff's to the RawArray() approach? Slower, buggier?

    对于RawArray()方法是否存在权衡?慢,更多缺陷?

  4. Do I need any ctype-to-dtype mapping for the numpy-sharedmem method?

    对于numpy-sharedmem方法,我需要任何ctype-to-dtype映射吗?

  5. Does anyone have an example of some OpenSource code doing this? I'm a very hands-on learned and it's hard to get this working without any kind of good example to look at.

    有没有人举过一些开放源代码的例子?我是一个非常亲力亲为的人,如果没有任何好的例子,我是很难让这个工作的。

If there's any additional info I can provide to help clarify this for others, please comment and I'll add. Thanks!

如果有任何其他的信息我可以提供帮助澄清这为其他人,请评论,我将添加。谢谢!

This needs to run on Ubuntu Linux and Maybe Mac OS, but portability isn't a huge concern.

这需要在Ubuntu Linux和Mac OS上运行,但是可移植性并不是一个大问题。

5 个解决方案

#1


23  

@Velimir Mlaker gave a great answer. I thought I could add some bits of comments and a tiny example.

@Velimir Mlaker给出了一个很好的回答。我想我可以添加一些评论和一个小例子。

(I couldn't find much documentation on sharedmem - these are the results of my own experiments.)

(我找不到多少关于sharedmem的文档——这些是我自己实验的结果。)

  1. Do you need to pass the handles when the subprocess is starting, or after it has started? If it's just the former, you can just use the target and args arguments for Process. This is potentially better than using a global variable.
  2. 是否需要在子进程启动时或启动后传递句柄?如果只是前者,您可以使用target和args参数作为进程。这可能比使用全局变量更好。
  3. From the discussion page you linked, it appears that support for 64-bit Linux was added to sharedmem a while back, so it could be a non-issue.
  4. 从您链接的讨论页面中可以看出,对64位Linux的支持不久前已经添加到sharedmem中,所以这可能不是问题。
  5. I don't know about this one.
  6. 我不知道这个。
  7. No. Refer to example below.
  8. 不。参考下面的例子。

Example

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
    data[start] = 0;

def split_work(num):
    n = 20
    width  = n/num
    shared = sharedmem.empty(n)
    shared[:] = numpy.random.rand(1, n)[0]
    print "values are %s" % shared

    processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print "values are %s" % shared
    print "type is %s" % type(shared[0])

if __name__ == '__main__':
    split_work(4)

Output

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
  0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
  0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
  0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
  0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
  0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
  0.7292129   0.06063283]
type is <type 'numpy.float64'>

This related question might be useful.

这个相关的问题可能有用。

#2


29  

If you are on Linux (or any POSIX-compliant system), you can define this array as a global variable. multiprocessing is using fork() on Linux when it starts a new child process. A newly spawned child process automatically shares the memory with its parent as long as it does not change it (copy-on-write mechanism).

如果您在Linux(或任何兼容posix的系统)上,您可以将这个数组定义为全局变量。多处理是在Linux上使用fork()来启动一个新的子进程。新生成的子进程只要不更改内存,就会自动与其父进程共享内存(即写即拷机制)。

Since you are saying "I don't need any kind of locks, since the array (actually a matrix) will be read-only" taking advantage of this behavior would be a very simple and yet extremely efficient approach: all child processes will access the same data in physical memory when reading this large numpy array.

因为你说“我不需要任何类型的锁,因为数组(实际上是一个矩阵)将只读”利用这种行为将是一个非常简单,但非常有效的方法:所有子进程将访问相同的数据在物理内存读这大numpy数组。

Don't hand your array to the Process() constructor, this will instruct multiprocessing to pickle the data to the child, which would be extremely inefficient or impossible in your case. On Linux, right after fork() the child is an exact copy of the parent using the same physical memory, so all you need to do is making sure that the Python variable 'containing' the matrix is accessible from within the target function that you hand over to Process(). This you can typically achieve with a 'global' variable.

不要将数组交给Process()构造函数,这将指示multiprocessing将数据pickle到子进程,这在您的例子中是非常低效或不可能的。在Linux中,fork()后面的子节点是使用相同物理内存的父节点的精确副本,因此您需要做的就是确保从您提交给Process()的目标函数中可以访问“包含”矩阵的Python变量。这通常可以通过一个“全局”变量实现。

Example code:

示例代码:

from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print sum(global_array)


def main():
    processes = [Process(target=child) for _ in xrange(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

On Windows -- which does not support fork() -- multiprocessing is using the win32 API call CreateProcess. It creates an entirely new process from any given executable. That's why on Windows one is required to pickle data to the child if one needs data that has been created during runtime of the parent.

在不支持fork()的Windows上,multiprocessing使用win32 API调用CreateProcess。它从任何给定的可执行文件创建一个全新的进程。这就是为什么在Windows 1上,如果需要在父进程运行时创建的数据,则需要将数据pickle到子进程。

#3


19  

You may be interested in a tiny piece of code I wrote: github.com/vmlaker/benchmark-sharedmem

您可能对我写的一小段代码感兴趣:github.com/vmlaker/benchmark-sharedmem。

The only file of interest is main.py. It's a benchmark of numpy-sharedmem -- the code simply passes arrays (either numpy or sharedmem) to spawned processes, via Pipe. The workers just call sum() on the data. I was only interested in comparing the data communication times between the two implementations.

唯一感兴趣的文件是main.py。它是numpy-sharedmem的一个基准——代码只是通过管道将数组(numpy或sharedmem)传递给衍生的进程。工作人员只对数据调用sum()。我只对比较两个实现之间的数据通信时间感兴趣。

I also wrote another, more complex code: github.com/vmlaker/sherlock.

我还写了另一个更复杂的代码:github.com/vmlaker/sherlock。

Here I use the numpy-sharedmem module for real-time image processing with OpenCV -- the images are NumPy arrays, as per OpenCV's newer cv2 API. The images, actually references thereof, are shared between processes via the dictionary object created from multiprocessing.Manager (as opposed to using Queue or Pipe.) I'm getting great performance improvements when compared with using plain NumPy arrays.

在这里,我使用NumPy sharedmem模块对OpenCV进行实时图像处理——这些图像是NumPy数组,正如OpenCV更新的cv2 API所示。这些图像(实际上是它们的引用)通过多处理创建的dictionary对象在进程之间共享。管理器(与使用队列或管道相反)与使用普通的NumPy数组相比,我得到了很大的性能改进。

Pipe vs. Queue:

管与队列:

In my experience, IPC with Pipe is faster than Queue. And that makes sense, since Queue adds locking to make it safe for multiple producers/consumers. Pipe doesn't. But if you only have two processes talking back-and-forth, it's safe to use Pipe, or, as the docs read:

根据我的经验,使用管道的IPC要比队列快。这是有意义的,因为队列增加了锁定,使多个生产者/消费者安全。管没有。但是,如果你只有两个过程可以反复交流,那么使用管道是安全的,或者,正如文档中所写的:

... there is no risk of corruption from processes using different ends of the pipe at the same time.

…同时使用管道的不同末端的过程不会产生损坏的风险。

sharedmem safety:

sharedmem安全:

The main issue with sharedmem module is the possibility of memory leak upon ungraceful program exit. This is described in a lengthy discussion here. Although on Apr 10, 2011 Sturla mentions a fix to memory leak, I have still experienced leaks since then, using both repos, Sturla Molden's own on GitHub (github.com/sturlamolden/sharedmem-numpy) and Chris Lee-Messer's on Bitbucket (bitbucket.org/cleemesser/numpy-sharedmem).

sharedmem模块的主要问题是不体面的程序退出时内存泄漏的可能性。在这里的一个冗长的讨论中对此进行了描述。尽管在2011年4月10日,Sturla提到了对内存泄漏的修复,但从那以后,我仍然经历了泄漏,使用了两种repos,一种是Sturla Molden自己在GitHub上的repos (GitHub) (github.com/sturlamolden/sharedmem-nummem-numpy),另一种是Chris Lee-Messer在Bitbucket上的(Bitbucket) (Bitbucket) (Bitbucket)。

#4


11  

If your array is that big you can use numpy.memmap. For example, if you have an array stored in disk, say 'test.array', you can use simultaneous processes to access the data in it even in "writing" mode, but your case is simpler since you only need "reading" mode.

如果数组足够大,可以使用numpi .memmap。例如,如果有一个数组存储在磁盘中,那么输入“test”。数组,您可以使用同步进程来访问数据,即使是在“写入”模式下,但是由于您只需要“读取”模式,所以您的情况更简单。

Creating the array:

创建数组:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

You can then fill this array in the same way you do with an ordinary array. For example:

然后可以用普通数组的方式填充这个数组。例如:

a[:10,:100]=1.
a[10:,100:]=2.

The data is stored into disk when you delete the variable a.

当您删除变量a时,数据被存储到磁盘中。

Later on you can use multiple processes that will access the data in test.array:

稍后您可以使用多个进程访问test.array中的数据:

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))

# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

Related answers:

相关的答案:

#5


3  

You may also find it useful to take a look at the documentation for pyro as if you can partition your task appropriately you could use it to execute different sections on different machines as well as on different cores in the same machine.

您可能还会发现查看pyro的文档很有用,因为如果您能够适当地划分任务,您可以使用它在不同的机器上执行不同的部分,以及在同一机器的不同内核上执行不同的部分。

#1


23  

@Velimir Mlaker gave a great answer. I thought I could add some bits of comments and a tiny example.

@Velimir Mlaker给出了一个很好的回答。我想我可以添加一些评论和一个小例子。

(I couldn't find much documentation on sharedmem - these are the results of my own experiments.)

(我找不到多少关于sharedmem的文档——这些是我自己实验的结果。)

  1. Do you need to pass the handles when the subprocess is starting, or after it has started? If it's just the former, you can just use the target and args arguments for Process. This is potentially better than using a global variable.
  2. 是否需要在子进程启动时或启动后传递句柄?如果只是前者,您可以使用target和args参数作为进程。这可能比使用全局变量更好。
  3. From the discussion page you linked, it appears that support for 64-bit Linux was added to sharedmem a while back, so it could be a non-issue.
  4. 从您链接的讨论页面中可以看出,对64位Linux的支持不久前已经添加到sharedmem中,所以这可能不是问题。
  5. I don't know about this one.
  6. 我不知道这个。
  7. No. Refer to example below.
  8. 不。参考下面的例子。

Example

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
    data[start] = 0;

def split_work(num):
    n = 20
    width  = n/num
    shared = sharedmem.empty(n)
    shared[:] = numpy.random.rand(1, n)[0]
    print "values are %s" % shared

    processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print "values are %s" % shared
    print "type is %s" % type(shared[0])

if __name__ == '__main__':
    split_work(4)

Output

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
  0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
  0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
  0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
  0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
  0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
  0.7292129   0.06063283]
type is <type 'numpy.float64'>

This related question might be useful.

这个相关的问题可能有用。

#2


29  

If you are on Linux (or any POSIX-compliant system), you can define this array as a global variable. multiprocessing is using fork() on Linux when it starts a new child process. A newly spawned child process automatically shares the memory with its parent as long as it does not change it (copy-on-write mechanism).

如果您在Linux(或任何兼容posix的系统)上,您可以将这个数组定义为全局变量。多处理是在Linux上使用fork()来启动一个新的子进程。新生成的子进程只要不更改内存,就会自动与其父进程共享内存(即写即拷机制)。

Since you are saying "I don't need any kind of locks, since the array (actually a matrix) will be read-only" taking advantage of this behavior would be a very simple and yet extremely efficient approach: all child processes will access the same data in physical memory when reading this large numpy array.

因为你说“我不需要任何类型的锁,因为数组(实际上是一个矩阵)将只读”利用这种行为将是一个非常简单,但非常有效的方法:所有子进程将访问相同的数据在物理内存读这大numpy数组。

Don't hand your array to the Process() constructor, this will instruct multiprocessing to pickle the data to the child, which would be extremely inefficient or impossible in your case. On Linux, right after fork() the child is an exact copy of the parent using the same physical memory, so all you need to do is making sure that the Python variable 'containing' the matrix is accessible from within the target function that you hand over to Process(). This you can typically achieve with a 'global' variable.

不要将数组交给Process()构造函数,这将指示multiprocessing将数据pickle到子进程,这在您的例子中是非常低效或不可能的。在Linux中,fork()后面的子节点是使用相同物理内存的父节点的精确副本,因此您需要做的就是确保从您提交给Process()的目标函数中可以访问“包含”矩阵的Python变量。这通常可以通过一个“全局”变量实现。

Example code:

示例代码:

from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print sum(global_array)


def main():
    processes = [Process(target=child) for _ in xrange(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

On Windows -- which does not support fork() -- multiprocessing is using the win32 API call CreateProcess. It creates an entirely new process from any given executable. That's why on Windows one is required to pickle data to the child if one needs data that has been created during runtime of the parent.

在不支持fork()的Windows上,multiprocessing使用win32 API调用CreateProcess。它从任何给定的可执行文件创建一个全新的进程。这就是为什么在Windows 1上,如果需要在父进程运行时创建的数据,则需要将数据pickle到子进程。

#3


19  

You may be interested in a tiny piece of code I wrote: github.com/vmlaker/benchmark-sharedmem

您可能对我写的一小段代码感兴趣:github.com/vmlaker/benchmark-sharedmem。

The only file of interest is main.py. It's a benchmark of numpy-sharedmem -- the code simply passes arrays (either numpy or sharedmem) to spawned processes, via Pipe. The workers just call sum() on the data. I was only interested in comparing the data communication times between the two implementations.

唯一感兴趣的文件是main.py。它是numpy-sharedmem的一个基准——代码只是通过管道将数组(numpy或sharedmem)传递给衍生的进程。工作人员只对数据调用sum()。我只对比较两个实现之间的数据通信时间感兴趣。

I also wrote another, more complex code: github.com/vmlaker/sherlock.

我还写了另一个更复杂的代码:github.com/vmlaker/sherlock。

Here I use the numpy-sharedmem module for real-time image processing with OpenCV -- the images are NumPy arrays, as per OpenCV's newer cv2 API. The images, actually references thereof, are shared between processes via the dictionary object created from multiprocessing.Manager (as opposed to using Queue or Pipe.) I'm getting great performance improvements when compared with using plain NumPy arrays.

在这里,我使用NumPy sharedmem模块对OpenCV进行实时图像处理——这些图像是NumPy数组,正如OpenCV更新的cv2 API所示。这些图像(实际上是它们的引用)通过多处理创建的dictionary对象在进程之间共享。管理器(与使用队列或管道相反)与使用普通的NumPy数组相比,我得到了很大的性能改进。

Pipe vs. Queue:

管与队列:

In my experience, IPC with Pipe is faster than Queue. And that makes sense, since Queue adds locking to make it safe for multiple producers/consumers. Pipe doesn't. But if you only have two processes talking back-and-forth, it's safe to use Pipe, or, as the docs read:

根据我的经验,使用管道的IPC要比队列快。这是有意义的,因为队列增加了锁定,使多个生产者/消费者安全。管没有。但是,如果你只有两个过程可以反复交流,那么使用管道是安全的,或者,正如文档中所写的:

... there is no risk of corruption from processes using different ends of the pipe at the same time.

…同时使用管道的不同末端的过程不会产生损坏的风险。

sharedmem safety:

sharedmem安全:

The main issue with sharedmem module is the possibility of memory leak upon ungraceful program exit. This is described in a lengthy discussion here. Although on Apr 10, 2011 Sturla mentions a fix to memory leak, I have still experienced leaks since then, using both repos, Sturla Molden's own on GitHub (github.com/sturlamolden/sharedmem-numpy) and Chris Lee-Messer's on Bitbucket (bitbucket.org/cleemesser/numpy-sharedmem).

sharedmem模块的主要问题是不体面的程序退出时内存泄漏的可能性。在这里的一个冗长的讨论中对此进行了描述。尽管在2011年4月10日,Sturla提到了对内存泄漏的修复,但从那以后,我仍然经历了泄漏,使用了两种repos,一种是Sturla Molden自己在GitHub上的repos (GitHub) (github.com/sturlamolden/sharedmem-nummem-numpy),另一种是Chris Lee-Messer在Bitbucket上的(Bitbucket) (Bitbucket) (Bitbucket)。

#4


11  

If your array is that big you can use numpy.memmap. For example, if you have an array stored in disk, say 'test.array', you can use simultaneous processes to access the data in it even in "writing" mode, but your case is simpler since you only need "reading" mode.

如果数组足够大,可以使用numpi .memmap。例如,如果有一个数组存储在磁盘中,那么输入“test”。数组,您可以使用同步进程来访问数据,即使是在“写入”模式下,但是由于您只需要“读取”模式,所以您的情况更简单。

Creating the array:

创建数组:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

You can then fill this array in the same way you do with an ordinary array. For example:

然后可以用普通数组的方式填充这个数组。例如:

a[:10,:100]=1.
a[10:,100:]=2.

The data is stored into disk when you delete the variable a.

当您删除变量a时,数据被存储到磁盘中。

Later on you can use multiple processes that will access the data in test.array:

稍后您可以使用多个进程访问test.array中的数据:

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))

# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

Related answers:

相关的答案:

#5


3  

You may also find it useful to take a look at the documentation for pyro as if you can partition your task appropriately you could use it to execute different sections on different machines as well as on different cores in the same machine.

您可能还会发现查看pyro的文档很有用,因为如果您能够适当地划分任务,您可以使用它在不同的机器上执行不同的部分,以及在同一机器的不同内核上执行不同的部分。