快速加权欧几里得距离点之间的阵列

时间:2022-08-22 13:47:11

I need to efficiently calculate the euclidean weighted distances for every x,y point in a given array to every other x,y point in another array. This is the code I have which works as expected:

我需要有效地计算欧几里得加权距离对于一个给定数组中的每一个x,y点到另一个数组中的每一个x,y点。这是我所拥有的代码,它可以像预期的那样工作:

import numpy as np
import random

def rand_data(integ):
    '''
    Function that generates 'integ' random values between [0.,1.)
    '''
    rand_dat = [random.random() for _ in range(integ)]

    return rand_dat

def weighted_dist(indx, x_coo, y_coo):
    '''
    Function that calculates *weighted* euclidean distances.
    '''
    dist_point_list = []
    # Iterate through every point in array_2.
    for indx2, x_coo2 in enumerate(array_2[0]):
        y_coo2 = array_2[1][indx2]
        # Weighted distance in x.
        x_dist_weight = (x_coo-x_coo2)/w_data[0][indx] 
        # Weighted distance in y.
        y_dist_weight = (y_coo-y_coo2)/w_data[1][indx] 
        # Weighted distance between point from array_1 passed and this point
        # from array_2.
        dist = np.sqrt(x_dist_weight**2 + y_dist_weight**2)
        # Append weighted distance value to list.
        dist_point_list.append(round(dist, 8))

    return dist_point_list


# Generate random x,y data points.
array_1 = np.array([rand_data(10), rand_data(10)], dtype=float)

# Generate weights for each x,y coord for points in array_1.
w_data = np.array([rand_data(10), rand_data(10)], dtype=float)

# Generate second larger array.
array_2 = np.array([rand_data(100), rand_data(100)], dtype=float)


# Obtain *weighted* distances for every point in array_1 to every point in array_2.
dist = []
# Iterate through every point in array_1.
for indx, x_coo in enumerate(array_1[0]):
    y_coo = array_1[1][indx]
    # Call function to get weighted distances for this point to every point in
    # array_2.
    dist.append(weighted_dist(indx, x_coo, y_coo))

The final list dist holds as many sub-lists as points are in the first array with as many elements in each as points are in the second one (the weighted distances).

最后的list dist包含的子列表和第一个数组中的点一样多,每个元素的数量与第二个数组中的点一样多(加权距离)。

I'd like to know if there's a way to make this code more efficient, perhaps using the cdist function, because this process becomes quite expensive when the arrays have lots of elements (which in my case they have) and when I have to check the distances for lots of arrays (which I also have)

我想知道如果有一个方法,使这段代码效率更高,也许使用cdist函数,因为这个过程变得相当昂贵,当数组的元素有很多(就我而言),当我不得不检查阵列的距离对很多(我也)

4 个解决方案

#1


4  

@Evan and @Martinis Group are on the right track - to expand on Evan's answer, here's a function that uses broadcasting to quickly calculate the n-dimensional weighted euclidean distance without Python loops:

@Evan和@Martinis Group走在了正确的道路上——为了扩展Evan的答案,这里有一个函数,使用广播快速计算不带Python循环的n维加权欧式距离:

import numpy as np

def fast_wdist(A, B, W):
    """
    Compute the weighted euclidean distance between two arrays of points:

    D{i,j} = 
    sqrt( ((A{0,i}-B{0,j})/W{0,i})^2 + ... + ((A{k,i}-B{k,j})/W{k,i})^2 )

    inputs:
        A is an (k, m) array of coordinates
        B is an (k, n) array of coordinates
        W is an (k, m) array of weights

    returns:
        D is an (m, n) array of weighted euclidean distances
    """

    # compute the differences and apply the weights in one go using
    # broadcasting jujitsu. the result is (n, k, m)
    wdiff = (A[np.newaxis,...] - B[np.newaxis,...].T) / W[np.newaxis,...]

    # square and sum over the second axis, take the sqrt and transpose. the
    # result is an (m, n) array of weighted euclidean distances
    D = np.sqrt((wdiff*wdiff).sum(1)).T

    return D

To check that this works OK, we'll compare it to a slower version that uses nested Python loops:

为了检查它是否工作正常,我们将把它与使用嵌套Python循环的较慢版本进行比较:

def slow_wdist(A, B, W):

    k,m = A.shape
    _,n = B.shape
    D = np.zeros((m, n))

    for ii in xrange(m):
        for jj in xrange(n):
            wdiff = (A[:,ii] - B[:,jj]) / W[:,ii]
            D[ii,jj] = np.sqrt((wdiff**2).sum())
    return D

First, let's make sure that the two functions give the same answer:

首先,让我们确保这两个函数给出相同的答案:

# make some random points and weights
def setup(k=2, m=100, n=300):
    return np.random.randn(k,m), np.random.randn(k,n),np.random.randn(k,m)

a, b, w = setup()
d0 = slow_wdist(a, b, w)
d1 = fast_wdist(a, b, w)

print np.allclose(d0, d1)
# True

Needless to say, the version that uses broadcasting rather than Python loops is several orders of magnitude faster:

不用说,使用广播而不是Python循环的版本的速度要快几个数量级:

%%timeit a, b, w = setup()
slow_wdist(a, b, w)
# 1 loops, best of 3: 647 ms per loop

%%timeit a, b, w = setup()
fast_wdist(a, b, w)
# 1000 loops, best of 3: 620 us per loop

#2


3  

You could use cdist if you don't need weighted distances. If you need weighted distances and performance, create an array of the appropriate output size, and use either an automated accelerator like Numba or Parakeet, or hand-tune the code with Cython.

如果不需要加权距离,可以使用cdist。如果需要加权距离和性能,可以创建一个适当的输出大小的数组,并使用Numba或Parakeet之类的自动加速器,或者使用Cython手动调优代码。

#3


1  

You can avoid looping by using code that looks like the following:

您可以使用以下代码来避免循环:

def compute_distances(A, B, W):
    Ax = A[:,0].reshape(1, A.shape[0])
    Bx = B[:,0].reshape(A.shape[0], 1)
    dx = Bx-Ax

    # Same for dy
    dist = np.sqrt(dx**2 + dy**2) * W
    return dist

That will run a lot faster in python that anything that loops as long as you have enough memory for the arrays.

这将在python中运行得更快,只要您有足够的内存,就可以进行任何循环。

#4


0  

You could try removing the square root, since if a>b, it follows that a squared > b squared... and computers are REALLY slow at square roots normally.

你可以试着把平方根去掉,因为如果a>b,它就等于a方>b方…而电脑通常在平方根上很慢。

#1


4  

@Evan and @Martinis Group are on the right track - to expand on Evan's answer, here's a function that uses broadcasting to quickly calculate the n-dimensional weighted euclidean distance without Python loops:

@Evan和@Martinis Group走在了正确的道路上——为了扩展Evan的答案,这里有一个函数,使用广播快速计算不带Python循环的n维加权欧式距离:

import numpy as np

def fast_wdist(A, B, W):
    """
    Compute the weighted euclidean distance between two arrays of points:

    D{i,j} = 
    sqrt( ((A{0,i}-B{0,j})/W{0,i})^2 + ... + ((A{k,i}-B{k,j})/W{k,i})^2 )

    inputs:
        A is an (k, m) array of coordinates
        B is an (k, n) array of coordinates
        W is an (k, m) array of weights

    returns:
        D is an (m, n) array of weighted euclidean distances
    """

    # compute the differences and apply the weights in one go using
    # broadcasting jujitsu. the result is (n, k, m)
    wdiff = (A[np.newaxis,...] - B[np.newaxis,...].T) / W[np.newaxis,...]

    # square and sum over the second axis, take the sqrt and transpose. the
    # result is an (m, n) array of weighted euclidean distances
    D = np.sqrt((wdiff*wdiff).sum(1)).T

    return D

To check that this works OK, we'll compare it to a slower version that uses nested Python loops:

为了检查它是否工作正常,我们将把它与使用嵌套Python循环的较慢版本进行比较:

def slow_wdist(A, B, W):

    k,m = A.shape
    _,n = B.shape
    D = np.zeros((m, n))

    for ii in xrange(m):
        for jj in xrange(n):
            wdiff = (A[:,ii] - B[:,jj]) / W[:,ii]
            D[ii,jj] = np.sqrt((wdiff**2).sum())
    return D

First, let's make sure that the two functions give the same answer:

首先,让我们确保这两个函数给出相同的答案:

# make some random points and weights
def setup(k=2, m=100, n=300):
    return np.random.randn(k,m), np.random.randn(k,n),np.random.randn(k,m)

a, b, w = setup()
d0 = slow_wdist(a, b, w)
d1 = fast_wdist(a, b, w)

print np.allclose(d0, d1)
# True

Needless to say, the version that uses broadcasting rather than Python loops is several orders of magnitude faster:

不用说,使用广播而不是Python循环的版本的速度要快几个数量级:

%%timeit a, b, w = setup()
slow_wdist(a, b, w)
# 1 loops, best of 3: 647 ms per loop

%%timeit a, b, w = setup()
fast_wdist(a, b, w)
# 1000 loops, best of 3: 620 us per loop

#2


3  

You could use cdist if you don't need weighted distances. If you need weighted distances and performance, create an array of the appropriate output size, and use either an automated accelerator like Numba or Parakeet, or hand-tune the code with Cython.

如果不需要加权距离,可以使用cdist。如果需要加权距离和性能,可以创建一个适当的输出大小的数组,并使用Numba或Parakeet之类的自动加速器,或者使用Cython手动调优代码。

#3


1  

You can avoid looping by using code that looks like the following:

您可以使用以下代码来避免循环:

def compute_distances(A, B, W):
    Ax = A[:,0].reshape(1, A.shape[0])
    Bx = B[:,0].reshape(A.shape[0], 1)
    dx = Bx-Ax

    # Same for dy
    dist = np.sqrt(dx**2 + dy**2) * W
    return dist

That will run a lot faster in python that anything that loops as long as you have enough memory for the arrays.

这将在python中运行得更快,只要您有足够的内存,就可以进行任何循环。

#4


0  

You could try removing the square root, since if a>b, it follows that a squared > b squared... and computers are REALLY slow at square roots normally.

你可以试着把平方根去掉,因为如果a>b,它就等于a方>b方…而电脑通常在平方根上很慢。