在共享内存中使用numpy数组进行多处理。

时间:2021-07-01 13:24:16

I would like to use a numpy array in shared memory for use with the multiprocessing module. The difficulty is using it like a numpy array, and not just as a ctypes array.

我希望在共享内存中使用numpy数组来与多处理模块一起使用。难点在于像使用numpy数组一样使用它,而不仅仅是使用ctypes数组。

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

This produces output such as:

这将产生如下输出:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

The array can be accessed in a ctypes manner, e.g. arr[i] makes sense. However, it is not a numpy array, and I cannot perform operations such as -1*arr, or arr.sum(). I suppose a solution would be to convert the ctypes array into a numpy array. However (besides not being able to make this work), I don't believe it would be shared anymore.

可以以ctype的方式访问数组,例如arr[i]是有意义的。但是,它不是一个numpy数组,我不能执行诸如-1*arr或arr.sum()之类的操作。我认为解决方案是将ctypes数组转换为numpy数组。然而(除了不能让它工作之外),我不相信它会被分享。

It seems there would be a standard solution to what has to be a common problem.

似乎会有一个标准的解决方案来解决这个常见的问题。

5 个解决方案

#1


63  

To add to @unutbu's (not available anymore) and @Henry Gomersall's answers. You could use shared_arr.get_lock() to synchronize access when needed:

添加到@unutbu的(不再可用)和@Henry Gomersall的答案。您可以使用shared_arr.get_lock()在需要时同步访问:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Example

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

If you don't need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.

如果不需要同步访问或创建自己的锁,则不需要mp.Array()。您可以使用mp.sharedctypes。在这种情况下RawArray。

#2


14  

The Array object has a get_obj() method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work...

数组对象具有与之关联的get_obj()方法,该方法返回表示缓冲区接口的ctypes数组。我认为以下方法应该有效……

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

When run, this prints out the first element of a now being 10.0, showing a and b are just two views into the same memory.

运行时,这将打印出现在为10.0的a的第一个元素,显示a和b只是同一内存中的两个视图。

In order to make sure it is still multiprocessor safe, I believe you will have to use the acquire and release methods that exist on the Array object, a, and its built in lock to make sure its all safely accessed (though I'm not an expert on the multiprocessor module).

为了确保它仍然是多处理器安全的,我相信您将不得不使用数组对象a及其内建的lock上存在的获取和发布方法来确保它的所有安全访问(尽管我不是多处理器模块的专家)。

#3


10  

While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

虽然已经给出的答案是好的,但只要满足两个条件,这个问题就会有更容易的解决办法:

  1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and
  2. 您正在使用与posix兼容的操作系统(例如Linux、Mac OSX);和
  3. Your child processes need read-only access to the shared array.
  4. 子进程需要对共享数组进行只读访问。

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent's memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a multiprocessing.Pool or so.

在这种情况下,您不需要摆弄显式地使变量共享,因为子进程将使用fork创建。分叉的子进程自动共享父进程的内存空间。在Python多处理上下文中,这意味着它共享所有模块级变量;注意,这并不适用于显式地传递给子进程或调用多进程的函数的参数。池。

A simple example:

一个简单的例子:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

#4


8  

You can use the sharedmem module: https://bitbucket.org/cleemesser/numpy-sharedmem

您可以使用sharedmem模块:https://bitbucket.org/cleemesser/numpy-sharedmem。

Here's your original code then, this time using shared memory that behaves like a NumPy array (note the additional last statement calling a NumPy sum() function):

这是您的原始代码,这次使用的共享内存的行为类似于NumPy数组(注意,最后一条语句调用NumPy sum()函数):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()

#5


7  

I've written a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe you will find it handy.

我编写了一个小python模块,它使用POSIX共享内存来共享python解释器之间的numpy数组。也许你会发现它很方便。

https://pypi.python.org/pypi/SharedArray

https://pypi.python.org/pypi/SharedArray

Here's how it works:

它是如何工作的:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])

#1


63  

To add to @unutbu's (not available anymore) and @Henry Gomersall's answers. You could use shared_arr.get_lock() to synchronize access when needed:

添加到@unutbu的(不再可用)和@Henry Gomersall的答案。您可以使用shared_arr.get_lock()在需要时同步访问:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Example

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

If you don't need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.

如果不需要同步访问或创建自己的锁,则不需要mp.Array()。您可以使用mp.sharedctypes。在这种情况下RawArray。

#2


14  

The Array object has a get_obj() method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work...

数组对象具有与之关联的get_obj()方法,该方法返回表示缓冲区接口的ctypes数组。我认为以下方法应该有效……

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

When run, this prints out the first element of a now being 10.0, showing a and b are just two views into the same memory.

运行时,这将打印出现在为10.0的a的第一个元素,显示a和b只是同一内存中的两个视图。

In order to make sure it is still multiprocessor safe, I believe you will have to use the acquire and release methods that exist on the Array object, a, and its built in lock to make sure its all safely accessed (though I'm not an expert on the multiprocessor module).

为了确保它仍然是多处理器安全的,我相信您将不得不使用数组对象a及其内建的lock上存在的获取和发布方法来确保它的所有安全访问(尽管我不是多处理器模块的专家)。

#3


10  

While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

虽然已经给出的答案是好的,但只要满足两个条件,这个问题就会有更容易的解决办法:

  1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and
  2. 您正在使用与posix兼容的操作系统(例如Linux、Mac OSX);和
  3. Your child processes need read-only access to the shared array.
  4. 子进程需要对共享数组进行只读访问。

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent's memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a multiprocessing.Pool or so.

在这种情况下,您不需要摆弄显式地使变量共享,因为子进程将使用fork创建。分叉的子进程自动共享父进程的内存空间。在Python多处理上下文中,这意味着它共享所有模块级变量;注意,这并不适用于显式地传递给子进程或调用多进程的函数的参数。池。

A simple example:

一个简单的例子:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

#4


8  

You can use the sharedmem module: https://bitbucket.org/cleemesser/numpy-sharedmem

您可以使用sharedmem模块:https://bitbucket.org/cleemesser/numpy-sharedmem。

Here's your original code then, this time using shared memory that behaves like a NumPy array (note the additional last statement calling a NumPy sum() function):

这是您的原始代码,这次使用的共享内存的行为类似于NumPy数组(注意,最后一条语句调用NumPy sum()函数):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()

#5


7  

I've written a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe you will find it handy.

我编写了一个小python模块,它使用POSIX共享内存来共享python解释器之间的numpy数组。也许你会发现它很方便。

https://pypi.python.org/pypi/SharedArray

https://pypi.python.org/pypi/SharedArray

Here's how it works:

它是如何工作的:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])