从第三个数组中的两个数组有效地获得每对元素的最小值

时间:2021-06-07 12:21:47

I have two arrays of N floats (which act as (x,y) coordinates and might have duplicates) and and associated z array of N floats (which act as weights for the coordinates).

我有两个N浮点数(它们充当(x,y)坐标并且可能有重复)和N个浮点数的相关z数组(它们充当坐标的权重)。

For each (x,y) pair of floats I need to select the pair with the smallest associated z value. I've defined a selectMinz() function which does this (see code below) but it takes too long.

对于每个(x,y)浮动对,我需要选择具有最小关联z值的对。我已经定义了一个selectMinz()函数来执行此操作(请参阅下面的代码),但这需要太长时间。

How could I improve the performance of this function?

我怎样才能提高这个功能的性能?

import numpy as np
import time


def getData():
    N = 100000
    x = np.arange(0.0005, 0.03, 0.001)
    y = np.arange(6., 10., .05)
    # Select N values for x,y, where values can be repeated
    x = np.random.choice(x, N)
    y = np.random.choice(y, N)
    z = np.random.uniform(10., 15., N)
    return x, y, z


def selectMinz(x, y, z):
    """
    Select the minimum z for each (x,y) pair.
    """
    xy_unq, z_unq = [], []
    # For each (x,y) pair
    for i, xy in enumerate(zip(*[x, y])):
        # If this xy pair was already stored in the xy_unq list
        if xy in xy_unq:
            # If the stored z value associated with this xy pair is
            # larger than this new z[i] value
            if z_unq[xy_unq.index(xy)] > z[i]:
                # Store this smaller value instead
                z_unq[xy_unq.index(xy)] = z[i]
        else:
            # Store the xy pair, and its associated z value
            xy_unq.append(xy)
            z_unq.append(z[i])

    return xy_unq, z_unq


# Define data with the proper format.
x, y, z = getData()

s = time.clock()
xy_unq, z_unq = selectMinz(x, y, z)  # <-- TAKES TOO LONG (~15s in my system)
print(time.clock() - s)

3 个解决方案

#1


3  

Steps :

  1. Use lex-sort to make the x-y pairs come in sequence. Or, we can use a scaling method to scale one of the arrays by the range of values on the other and then sum it with the other array and finally use argsort to get the lex-sorted equivalent indices.
  2. 使用lex-sort使x-y对按顺序排列。或者,我们可以使用缩放方法通过另一个数组的值范围缩放其中一个数组,然后将其与另一个数组相加,最后使用argsort获取lex排序的等效索引。

  3. Use np.minimum.reduceat to get the minimum values in intervals, defined by the pair groupings.
  4. 使用np.minimum.reduceat获取间隔中的最小值,由对分组定义。

Thus, we would have one vectorized solution, like so -

因此,我们将有一个矢量化解决方案,如此 -

def selectMinz_vectorized(x, y, z):
    # Get grouped lex-sort indices
    sidx = (y + x*(y.max() - y.min() + 1)).argsort()
    # or sidx = np.lexsort([x, y])

    # Lex-sort x, y, z
    x_sorted = x[sidx]
    y_sorted = y[sidx]
    z_sorted = z[sidx]

    # Get equality mask between each sorted X and Y elem against previous ones.
    # The non-zero indices of its inverted mask gives us the indices where the 
    # new groupings start. We are calling those as cut_idx.
    seq_eq_mask = (x_sorted[1:] == x_sorted[:-1]) & (y_sorted[1:] == y_sorted[:-1])
    cut_idx = np.flatnonzero(np.concatenate(( [True], ~seq_eq_mask)))

    # Use those cut_idx to get intervalled minimum values
    minZ = np.minimum.reduceat(z_sorted, cut_idx)

    # Make tuples of the groupings of x,y and the corresponding min Z values
    return (zip(x_sorted[cut_idx], y_sorted[cut_idx]), minZ.tolist())

Sample run -

样品运行 -

In [120]: np.c_[x,y,z]
Out[120]: 
array([[  0.,   1.,  69.],
       [  2.,   0.,  47.],
       [  1.,   0.,  62.],
       [  0.,   2.,  33.],
       [  1.,   7.,  32.],
       [  1.,   0.,  50.],
       [  2.,   0.,  55.]])

In [121]: selectMinz(x,y,z) # original method
Out[121]: 
([(0.0, 1.0), (2.0, 0.0), (1.0, 0.0), (0.0, 2.0), (1.0, 7.0)],
 [69.0, 47.0, 50.0, 33.0, 32.0])

In [122]: selectMinz_vectorized(x,y,z)
Out[122]: 
([(1.0, 0.0), (2.0, 0.0), (0.0, 1.0), (0.0, 2.0), (1.0, 7.0)],
 [50.0, 47.0, 69.0, 33.0, 32.0])

Here's my initial approach that involved creating a stacked array and then perform those operations. The implementation looked something like this -

这是我的初始方法,涉及创建堆叠数组然后执行这些操作。实现看起来像这样 -

def selectMinz_vectorized_v2(x, y, z):
    d = np.column_stack((x,y,z))
    sidx = np.lexsort(d[:,:2].T)
    b = d[sidx]  
    cut_idx = np.r_[0,np.flatnonzero(~(b[1:,:2] == b[:-1,:2]).all(1))+1]
    minZ = np.minimum.reduceat(b[:,-1], cut_idx)
    return ([tuple(i) for i in b[cut_idx,:2]], minZ.tolist())

Benchmarking for the vectorized approaches

矢量化方法的基准测试

Approaches -

# Pruned version of the approach posted earlier
def selectMinz_vectorized_pruned(x, y, z):
    sidx = (y + x*(y.max() - y.min() + 1)).argsort()
    x_sorted = x[sidx]
    y_sorted = y[sidx]
    z_sorted = z[sidx]
    seq_eq_mask = (x_sorted[1:] == x_sorted[:-1]) & (y_sorted[1:] == y_sorted[:-1])
    cut_idx = np.flatnonzero(np.concatenate(( [True], ~seq_eq_mask)))
    minZ = np.minimum.reduceat(z_sorted, cut_idx)
    return x_sorted[cut_idx], y_sorted[cut_idx], minZ

def numpy_indexed_app(x,y,z): # @Eelco Hoogendoorn's soln
    return npi.group_by((x, y)).min(z)

Timings -

In [141]: x,y,z=getData(10000)

In [142]: %timeit selectMinz_vectorized_pruned(x, y, z)
     ...: %timeit numpy_indexed_app(x,y,z)
     ...: 
1000 loops, best of 3: 763 µs per loop
1000 loops, best of 3: 1.09 ms per loop

In [143]: x,y,z=getData(100000)

In [144]: %timeit selectMinz_vectorized_pruned(x, y, z)
     ...: %timeit numpy_indexed_app(x,y,z)
     ...: 
100 loops, best of 3: 8.53 ms per loop
100 loops, best of 3: 12.9 ms per loop

#2


3  

Changing the data structure of xy_unq and z_unq to a dictionary that holds both pieces of information brought the time down from ~7s to ~0.1s on my system.

将xy_unq和z_unq的数据结构更改为包含两条信息的字典会使我的系统上的时间从~7s减少到~0.1s。

def selectMinz(x, y, z):
    """
    Select the minimum z for each (x,y) pair.
    """
    xy_unq = {}
    # For each (x,y) pair
    for i, xy in enumerate(zip(*[x, y])):
        # If this xy pair was already stored in the xy_unq list
        if xy in xy_unq:
            # If the stored z value associated with this xy pair is
            # larger than this new z[i] value
            if xy_unq[xy] > z[i]:
                # Store this smaller value instead
                xy_unq[xy] = z[i]
        else:
            # Store the xy pair, and its associated z value
            xy_unq[xy] = z[i]

    return xy_unq.keys(), xy_unq.values()

The time for the above approach for me ranged from ~0.106s to ~0.11s. Here is an alternative approach with fewer lines of code, but takes slightly more time (~0.14):

对我来说上述方法的时间范围从~0.106s到~0.11s。这是一种替代方法,代码行数较少,但需要更多时间(~0.14):

def selectMinz(x, y, z):
    """
    Select the minimum z for each (x,y) pair.
    """
    xy_unq = {}
    # For each (x,y) pair
    for i, xy in enumerate(zip(*[x, y])):
        # If this xy pair was already stored in the xy_unq list
        if xy in xy_unq:
            # Store the value that is smaller between the current stored value and the new z[i]
            xy_unq[xy] = min(xy_unq[xy], z[i])
        else:
            # Store the xy pair, and its associated z value
            xy_unq[xy] = z[i]

    return xy_unq.keys(), xy_unq.values()

#3


2  

The numpy_indexed package (disclaimer: I am its author) contains functionality to solve these kind of grouping problems in an elegant and efficient manner:

numpy_indexed包(免责声明:我是它的作者)包含以优雅和有效的方式解决这些分组问题的功能:

import numpy_indexed as npi
xy_unq, z_unq = npi.group_by((x, y)).min(z)

#1


3  

Steps :

  1. Use lex-sort to make the x-y pairs come in sequence. Or, we can use a scaling method to scale one of the arrays by the range of values on the other and then sum it with the other array and finally use argsort to get the lex-sorted equivalent indices.
  2. 使用lex-sort使x-y对按顺序排列。或者,我们可以使用缩放方法通过另一个数组的值范围缩放其中一个数组,然后将其与另一个数组相加,最后使用argsort获取lex排序的等效索引。

  3. Use np.minimum.reduceat to get the minimum values in intervals, defined by the pair groupings.
  4. 使用np.minimum.reduceat获取间隔中的最小值,由对分组定义。

Thus, we would have one vectorized solution, like so -

因此,我们将有一个矢量化解决方案,如此 -

def selectMinz_vectorized(x, y, z):
    # Get grouped lex-sort indices
    sidx = (y + x*(y.max() - y.min() + 1)).argsort()
    # or sidx = np.lexsort([x, y])

    # Lex-sort x, y, z
    x_sorted = x[sidx]
    y_sorted = y[sidx]
    z_sorted = z[sidx]

    # Get equality mask between each sorted X and Y elem against previous ones.
    # The non-zero indices of its inverted mask gives us the indices where the 
    # new groupings start. We are calling those as cut_idx.
    seq_eq_mask = (x_sorted[1:] == x_sorted[:-1]) & (y_sorted[1:] == y_sorted[:-1])
    cut_idx = np.flatnonzero(np.concatenate(( [True], ~seq_eq_mask)))

    # Use those cut_idx to get intervalled minimum values
    minZ = np.minimum.reduceat(z_sorted, cut_idx)

    # Make tuples of the groupings of x,y and the corresponding min Z values
    return (zip(x_sorted[cut_idx], y_sorted[cut_idx]), minZ.tolist())

Sample run -

样品运行 -

In [120]: np.c_[x,y,z]
Out[120]: 
array([[  0.,   1.,  69.],
       [  2.,   0.,  47.],
       [  1.,   0.,  62.],
       [  0.,   2.,  33.],
       [  1.,   7.,  32.],
       [  1.,   0.,  50.],
       [  2.,   0.,  55.]])

In [121]: selectMinz(x,y,z) # original method
Out[121]: 
([(0.0, 1.0), (2.0, 0.0), (1.0, 0.0), (0.0, 2.0), (1.0, 7.0)],
 [69.0, 47.0, 50.0, 33.0, 32.0])

In [122]: selectMinz_vectorized(x,y,z)
Out[122]: 
([(1.0, 0.0), (2.0, 0.0), (0.0, 1.0), (0.0, 2.0), (1.0, 7.0)],
 [50.0, 47.0, 69.0, 33.0, 32.0])

Here's my initial approach that involved creating a stacked array and then perform those operations. The implementation looked something like this -

这是我的初始方法,涉及创建堆叠数组然后执行这些操作。实现看起来像这样 -

def selectMinz_vectorized_v2(x, y, z):
    d = np.column_stack((x,y,z))
    sidx = np.lexsort(d[:,:2].T)
    b = d[sidx]  
    cut_idx = np.r_[0,np.flatnonzero(~(b[1:,:2] == b[:-1,:2]).all(1))+1]
    minZ = np.minimum.reduceat(b[:,-1], cut_idx)
    return ([tuple(i) for i in b[cut_idx,:2]], minZ.tolist())

Benchmarking for the vectorized approaches

矢量化方法的基准测试

Approaches -

# Pruned version of the approach posted earlier
def selectMinz_vectorized_pruned(x, y, z):
    sidx = (y + x*(y.max() - y.min() + 1)).argsort()
    x_sorted = x[sidx]
    y_sorted = y[sidx]
    z_sorted = z[sidx]
    seq_eq_mask = (x_sorted[1:] == x_sorted[:-1]) & (y_sorted[1:] == y_sorted[:-1])
    cut_idx = np.flatnonzero(np.concatenate(( [True], ~seq_eq_mask)))
    minZ = np.minimum.reduceat(z_sorted, cut_idx)
    return x_sorted[cut_idx], y_sorted[cut_idx], minZ

def numpy_indexed_app(x,y,z): # @Eelco Hoogendoorn's soln
    return npi.group_by((x, y)).min(z)

Timings -

In [141]: x,y,z=getData(10000)

In [142]: %timeit selectMinz_vectorized_pruned(x, y, z)
     ...: %timeit numpy_indexed_app(x,y,z)
     ...: 
1000 loops, best of 3: 763 µs per loop
1000 loops, best of 3: 1.09 ms per loop

In [143]: x,y,z=getData(100000)

In [144]: %timeit selectMinz_vectorized_pruned(x, y, z)
     ...: %timeit numpy_indexed_app(x,y,z)
     ...: 
100 loops, best of 3: 8.53 ms per loop
100 loops, best of 3: 12.9 ms per loop

#2


3  

Changing the data structure of xy_unq and z_unq to a dictionary that holds both pieces of information brought the time down from ~7s to ~0.1s on my system.

将xy_unq和z_unq的数据结构更改为包含两条信息的字典会使我的系统上的时间从~7s减少到~0.1s。

def selectMinz(x, y, z):
    """
    Select the minimum z for each (x,y) pair.
    """
    xy_unq = {}
    # For each (x,y) pair
    for i, xy in enumerate(zip(*[x, y])):
        # If this xy pair was already stored in the xy_unq list
        if xy in xy_unq:
            # If the stored z value associated with this xy pair is
            # larger than this new z[i] value
            if xy_unq[xy] > z[i]:
                # Store this smaller value instead
                xy_unq[xy] = z[i]
        else:
            # Store the xy pair, and its associated z value
            xy_unq[xy] = z[i]

    return xy_unq.keys(), xy_unq.values()

The time for the above approach for me ranged from ~0.106s to ~0.11s. Here is an alternative approach with fewer lines of code, but takes slightly more time (~0.14):

对我来说上述方法的时间范围从~0.106s到~0.11s。这是一种替代方法,代码行数较少,但需要更多时间(~0.14):

def selectMinz(x, y, z):
    """
    Select the minimum z for each (x,y) pair.
    """
    xy_unq = {}
    # For each (x,y) pair
    for i, xy in enumerate(zip(*[x, y])):
        # If this xy pair was already stored in the xy_unq list
        if xy in xy_unq:
            # Store the value that is smaller between the current stored value and the new z[i]
            xy_unq[xy] = min(xy_unq[xy], z[i])
        else:
            # Store the xy pair, and its associated z value
            xy_unq[xy] = z[i]

    return xy_unq.keys(), xy_unq.values()

#3


2  

The numpy_indexed package (disclaimer: I am its author) contains functionality to solve these kind of grouping problems in an elegant and efficient manner:

numpy_indexed包(免责声明:我是它的作者)包含以优雅和有效的方式解决这些分组问题的功能:

import numpy_indexed as npi
xy_unq, z_unq = npi.group_by((x, y)).min(z)