在numpy数组中有效地交换元素

时间:2022-06-28 21:23:59

Assuming that we have a large matrix A, and the indices of two matrix elements (c1, r1), (c2, r2) that we want to swap:

假设我们有一个大矩阵A,以及我们要交换的两个矩阵元素(c1,r1),(c2,r2)的索引:

import numpy as np
A = np.random.rand(1000,1000)
c1, r1 = 10, 10
c2, r2 = 20, 40

The pythonic way to do so would be:

这样做的pythonic方法是:

A[c1, r1], A[c2, r2] = A[c2, r2], A[c1, r1]

However, this solution can be slow if you want to do a large number of swappings.

但是,如果要进行大量的交换,此解决方案可能会很慢。

Is there a more efficient way to swap two elements in a numpy array?

有没有更有效的方法来交换numpy数组中的两个元素?

Thanks in advance.

提前致谢。

2 个解决方案

#1


2  

Preliminary answer, which does not work

You can easily vectorize the swap operation, by using arrays of indexes (c1, r1, c2, r2) instead of iterating over lists of scalar indices.

通过使用索引数组(c1,r1,c2,r2)而不是迭代标量索引列表,可以轻松地对交换操作进行矢量化。

c1 = np.array(<all the "c1" values>, dtype=int)
r1 = np.array(<all the "r1" values>, dtype=int)
c2 = np.array(<all the "c2" values>, dtype=int)
r2 = np.array(<all the "r2" values>, dtype=int)
A[c1, r1], A[c2, r2] = A[c2, r2], A[c1, r1]

Note this performs all the swaps in one go, which can be different than iteratively, if the order of the swapping makes a difference. For this reason, this is not a valid answer to your question.

注意,如果交换的顺序有所不同,这将一次执行所有交换,这可能与迭代不同。因此,这不是您问题的有效答案。

E.g. swapping p1 with p2, then p2 with p3, is different from swapping p1 and p2, and p2 and p3 in one go. In the latter case, both p1 and p3 get the original value of p2, and p2 gets the last of the values between p1 and p3, i.e. p3 (according to the order they appear in the index-array).

例如。用p2交换p1,然后用p3交换p2,不同于交换p1和p2,以及p2和p3。在后一种情况下,p1和p3都得到p2的原始值,p2得到p1和p3之间的最后一个值,即p3(根据它们出现在索引数组中的顺序)。

However, since it is speed you're after, vectorizing the operation (in some way) must be the way to go.

但是,由于它是您所追求的速度,因此必须采用矢量化操作(以某种方式)。


Adding correctness to the above solution

So how can we perform vectorized swapping, while getting the output we need? We can take a hybrid approach, by breaking the lists of indexes into chunks (as few as possible), where each chunk only contains unique points, thus guaranteeing the order makes no difference. Swapping each chunk is done vercrorized-ly, and we only iterate over the chunks.

那么我们如何才能进行矢量化交换,同时获得我们需要的输出呢?我们可以采用混合方法,将索引列表分成块(尽可能少),其中每个块只包含唯一的点,从而保证顺序没有区别。交换每个块都是vercrorized-ly,我们只迭代块。

Here's a sketch of how this can work:

这是一个如何工作的草图:

c1, r1 = np.array([ np.arange(10), np.arange(10) ])
c2, r2 = np.array([ [2,4,6,8,0,1,3,5,7,9], [9,1,6,8,2,2,2,2,7,0] ])
A = np.empty((15,15))

def get_chunk_boundry(c1, r1, c2, r2):
    a1 = c1 + 1j * r1
    a2 = c2 + 1j * r2
    set1 = set()
    set2 = set()
    for i, (x1, x2) in enumerate(zip(a1, a2)):
        if x1 in set2 or x2 in set1:
            return i
        set1.add(x1); set2.add(x2)
    return len(c1)

while len(c1) > 0:
    i = get_chunk_boundry(c1, r1, c2, r2)
    c1b = c1[:i]; r1b = r1[:i]; c2b = c2[:i]; r2b = r2[:i]
    print 'swapping %d elements' % i
    A[c1b,r1b], A[c2b,r2b] = A[c2b,r2b], A[c1b,r1b]
    c1 = c1[i:]; r1 = r1[i:]; c2 = c2[i:]; r2 = r2[i:]

Slicing here will be faster if you store the indices as a 2dim array (N x 4) to begin with.

如果将索引存储为2dim数组(N x 4),那么在此处切片会更快。

#2


1  

Here's an iterative solution, which I wrote for reference purposes (as one way to deal with possible repeated items):

这是一个迭代解决方案,我为了参考目的而编写(作为处理可能重复项目的一种方法):

def iter2(A, rc1, rc2):
    for r,c in zip(rc1.T, rc2.T):
        j,k = tuple(r),tuple(c)
        A[j],A[k] = A[k],A[j]
    return A

For example, if:

例如,如果:

N = 4
Aref=np.arange(N)+np.arange(N)[:,None]*10

rc1=np.array([[0,0,0],[0,3,0]])
rc2=np.array([[3,3,2],[3,0,2]])

then

 print(iter2(A.copy(), rc1,rc2))

produces a swap of the corners, followed by a swap with (2,2):

产生角落的交换,然后用(2,2)交换:

[[22  1  2 30]
 [10 11 12 13]
 [20 21 33 23]
 [ 3 31 32  0]]

shx2's solution seems to do the same thing, though for my test case there seems to be bug in the chunking function.

shx2的解决方案似乎做了同样的事情,但对于我的测试用例,在分块功能中似乎存在错误。

For the shx2's test (15,15) array, my iterative solution is 4x faster! It is doing more swaps, but less work per swap. For larger arrays I expect the chunking to be faster, but I don't know how much larger we'd have to go. It will also depend on the repeat pattern.

对于shx2的测试(15,15)数组,我的迭代解决方案快了4倍!它正在进行更多交换,但每次交换的工作量更少。对于较大的阵列,我希望分块更快,但我不知道我们必须走多远。它还取决于重复模式。

The dumb vectorized swap with my arrays is:

我的数组的哑向量化交换是:

A[tuple(rc1)],A[tuple(rc2)] = A[tuple(rc2)],A[tuple(rc1)]

In this (15,15) example, it's only 20% faster than my iterative solution. Clearly we need a much large test case to produce some serious timings.

在这个(15,15)示例中,它仅比我的迭代解决方案快20%。显然,我们需要一个非常大的测试用例来产生一些严肃的时间。


The iterative solution is faster, and simpler, when operating on a 1d array. Even with the raveling and reshaping this function is the fastest that I've found. I'm not getting much of a speed improvement in Cython over this. (but cautions about arrays sizes still apply.)

在1d阵列上运行时,迭代解决方案更快,更简单。即使进行了散乱和重塑,这个功能也是我发现的最快的功能。我在Cython上的速度没有太大提升。 (但有关阵列大小的注意事项仍然适用。)

def adapt_1d(A, rc1, rc2, fn=None):
    # adapt a multidim case to use a 1d iterator
    rc2f = np.ravel_multi_index(rc2, A.shape)
    rc1f = np.ravel_multi_index(rc1, A.shape)
    Af = A.flatten()
    if fn is None:
        for r,c in zip(rc1f,rc2f):
            Af[r],Af[c] = Af[c],Af[r]
    else:
        fn(Af, rc1f, rc2f)
    return Af.reshape(A.shape)

#1


2  

Preliminary answer, which does not work

You can easily vectorize the swap operation, by using arrays of indexes (c1, r1, c2, r2) instead of iterating over lists of scalar indices.

通过使用索引数组(c1,r1,c2,r2)而不是迭代标量索引列表,可以轻松地对交换操作进行矢量化。

c1 = np.array(<all the "c1" values>, dtype=int)
r1 = np.array(<all the "r1" values>, dtype=int)
c2 = np.array(<all the "c2" values>, dtype=int)
r2 = np.array(<all the "r2" values>, dtype=int)
A[c1, r1], A[c2, r2] = A[c2, r2], A[c1, r1]

Note this performs all the swaps in one go, which can be different than iteratively, if the order of the swapping makes a difference. For this reason, this is not a valid answer to your question.

注意,如果交换的顺序有所不同,这将一次执行所有交换,这可能与迭代不同。因此,这不是您问题的有效答案。

E.g. swapping p1 with p2, then p2 with p3, is different from swapping p1 and p2, and p2 and p3 in one go. In the latter case, both p1 and p3 get the original value of p2, and p2 gets the last of the values between p1 and p3, i.e. p3 (according to the order they appear in the index-array).

例如。用p2交换p1,然后用p3交换p2,不同于交换p1和p2,以及p2和p3。在后一种情况下,p1和p3都得到p2的原始值,p2得到p1和p3之间的最后一个值,即p3(根据它们出现在索引数组中的顺序)。

However, since it is speed you're after, vectorizing the operation (in some way) must be the way to go.

但是,由于它是您所追求的速度,因此必须采用矢量化操作(以某种方式)。


Adding correctness to the above solution

So how can we perform vectorized swapping, while getting the output we need? We can take a hybrid approach, by breaking the lists of indexes into chunks (as few as possible), where each chunk only contains unique points, thus guaranteeing the order makes no difference. Swapping each chunk is done vercrorized-ly, and we only iterate over the chunks.

那么我们如何才能进行矢量化交换,同时获得我们需要的输出呢?我们可以采用混合方法,将索引列表分成块(尽可能少),其中每个块只包含唯一的点,从而保证顺序没有区别。交换每个块都是vercrorized-ly,我们只迭代块。

Here's a sketch of how this can work:

这是一个如何工作的草图:

c1, r1 = np.array([ np.arange(10), np.arange(10) ])
c2, r2 = np.array([ [2,4,6,8,0,1,3,5,7,9], [9,1,6,8,2,2,2,2,7,0] ])
A = np.empty((15,15))

def get_chunk_boundry(c1, r1, c2, r2):
    a1 = c1 + 1j * r1
    a2 = c2 + 1j * r2
    set1 = set()
    set2 = set()
    for i, (x1, x2) in enumerate(zip(a1, a2)):
        if x1 in set2 or x2 in set1:
            return i
        set1.add(x1); set2.add(x2)
    return len(c1)

while len(c1) > 0:
    i = get_chunk_boundry(c1, r1, c2, r2)
    c1b = c1[:i]; r1b = r1[:i]; c2b = c2[:i]; r2b = r2[:i]
    print 'swapping %d elements' % i
    A[c1b,r1b], A[c2b,r2b] = A[c2b,r2b], A[c1b,r1b]
    c1 = c1[i:]; r1 = r1[i:]; c2 = c2[i:]; r2 = r2[i:]

Slicing here will be faster if you store the indices as a 2dim array (N x 4) to begin with.

如果将索引存储为2dim数组(N x 4),那么在此处切片会更快。

#2


1  

Here's an iterative solution, which I wrote for reference purposes (as one way to deal with possible repeated items):

这是一个迭代解决方案,我为了参考目的而编写(作为处理可能重复项目的一种方法):

def iter2(A, rc1, rc2):
    for r,c in zip(rc1.T, rc2.T):
        j,k = tuple(r),tuple(c)
        A[j],A[k] = A[k],A[j]
    return A

For example, if:

例如,如果:

N = 4
Aref=np.arange(N)+np.arange(N)[:,None]*10

rc1=np.array([[0,0,0],[0,3,0]])
rc2=np.array([[3,3,2],[3,0,2]])

then

 print(iter2(A.copy(), rc1,rc2))

produces a swap of the corners, followed by a swap with (2,2):

产生角落的交换,然后用(2,2)交换:

[[22  1  2 30]
 [10 11 12 13]
 [20 21 33 23]
 [ 3 31 32  0]]

shx2's solution seems to do the same thing, though for my test case there seems to be bug in the chunking function.

shx2的解决方案似乎做了同样的事情,但对于我的测试用例,在分块功能中似乎存在错误。

For the shx2's test (15,15) array, my iterative solution is 4x faster! It is doing more swaps, but less work per swap. For larger arrays I expect the chunking to be faster, but I don't know how much larger we'd have to go. It will also depend on the repeat pattern.

对于shx2的测试(15,15)数组,我的迭代解决方案快了4倍!它正在进行更多交换,但每次交换的工作量更少。对于较大的阵列,我希望分块更快,但我不知道我们必须走多远。它还取决于重复模式。

The dumb vectorized swap with my arrays is:

我的数组的哑向量化交换是:

A[tuple(rc1)],A[tuple(rc2)] = A[tuple(rc2)],A[tuple(rc1)]

In this (15,15) example, it's only 20% faster than my iterative solution. Clearly we need a much large test case to produce some serious timings.

在这个(15,15)示例中,它仅比我的迭代解决方案快20%。显然,我们需要一个非常大的测试用例来产生一些严肃的时间。


The iterative solution is faster, and simpler, when operating on a 1d array. Even with the raveling and reshaping this function is the fastest that I've found. I'm not getting much of a speed improvement in Cython over this. (but cautions about arrays sizes still apply.)

在1d阵列上运行时,迭代解决方案更快,更简单。即使进行了散乱和重塑,这个功能也是我发现的最快的功能。我在Cython上的速度没有太大提升。 (但有关阵列大小的注意事项仍然适用。)

def adapt_1d(A, rc1, rc2, fn=None):
    # adapt a multidim case to use a 1d iterator
    rc2f = np.ravel_multi_index(rc2, A.shape)
    rc1f = np.ravel_multi_index(rc1, A.shape)
    Af = A.flatten()
    if fn is None:
        for r,c in zip(rc1f,rc2f):
            Af[r],Af[c] = Af[c],Af[r]
    else:
        fn(Af, rc1f, rc2f)
    return Af.reshape(A.shape)