
时间:2022-06-13 01:26:30

I have two ordered numpy arrays and I want to interleave them so that I take one item from the first array, then another from the second, then back to the first - taking the next item that is larger than the one I just took from the second and so on. Those are actually arrays of indices to other arrays, and I'll be ok with operating on the original arrays as long as the operation is vectorized (but of course working on the index array as a vector operation will be awesome).


Example (ok to assume that the intersection of arrays is empty)


a = array([1,2,3,4,7,8,9,10,17])
b = array([5,6,13,14,15,19,21,23])

I would like to get [1,5,7,13,17,19]


5 个解决方案



Vectorised solution (pedagogical style, easily understandable)

We can vectorise this by augmenting the arrays with a discriminator index, such that a is tagged 0 and b is tagged 1:


a_t = np.vstack((a, np.zeros_like(a)))
b_t = np.vstack((b, np.ones_like(b)))

Now, let's combine and sort:


c = np.hstack((a_t, b_t))[:, np.argsort(np.hstack((a, b)))]
array([[ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 13, 14, 15, 17, 19, 21, 23],
       [ 0,  0,  0,  0,  1,  1,  0,  0,  0,  0,  1,  1,  1,  0,  1,  1,  1]])

You can see that now the elements are in order but retaining their tags, so we can see which elements came from a and from b.


So, let's select the first element and each element where the tag changes from 0 (for a) to 1 (for b) and back again:

那么,让我们选择第一个元素和标记从0 (a)变为1 (b)的每个元素,然后再返回:

c[:, np.concatenate(([True], c[1, 1:] != c[1, :-1]))][0]
array([ 1,  5,  7, 13, 17, 19])

Efficient vectorised solution

You can do this slightly more efficiently by keeping the items and their tags in separate (but parallel) arrays:


ab = np.hstack((a, b))
s = np.argsort(ab)
t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
ab[s][np.concatenate(([True], t[1:] != t[:-1]))]
array([ 1,  5,  7, 13, 17, 19])

This is slightly more efficient than the above solution; I get an average of 45 as opposed to 90 microseconds, although your conditions may vary.




You could break the problem into two parts:


  1. Make a and b iterators which yield values only if they are bigger than some threshold.
  2. 使a和b迭代器只在值大于某个阈值时才产生值。
  3. Use itertools (essentially George Sakkis' roundrobin recipe) to alternate between the two iterators.
  4. 使用itertools(基本上是George Sakkis的roundrobin菜谱)在两个迭代器之间交替使用。

import itertools
import numpy as np

a = np.array([1,2,3,4,7,8,9,10,17])
b = np.array([5,6,13,14,15,19,21,23])

def takelarger(iterable):
    for x in iterable:
        if x > takelarger.threshold:
            takelarger.threshold = x
            yield x
takelarger.threshold = 0

def alternate(*iterables):
    # Modified version of George Sakkis' roundrobin recipe
    # http://docs.python.org/library/itertools.html#recipes
    pending = len(iterables)
    nexts = itertools.cycle(iter(it).next for it in iterables)
    while pending:
            for n in nexts:
                yield n()
        except StopIteration:
            # Unlike roundrobin, stop when any iterable has been consumed

def interleave(a, b):
    takelarger.threshold = 0
    return list(alternate(takelarger(a),takelarger(b)))




[1, 5, 7, 13, 17, 19]



a = [1,2,3,4,7,8,9,10,17]
b = [5,6,13,14,15,19,21,23]
c.append(a[0])  #add the first element to c
i=True          #breaking condition
loop=2          #initially we to choose b
while i:
    if loop==2:
        temp_lis=d([x-val for x in b])  #find the difference between every
                                        # element and the last element of c and
                                        # pick the smallest positive value.

        for k,y in enumerate(temp_lis):
            if y>0:
            i=False        #use for-else loop for determining the breaking condition

        loop=1   #change the loop to 1
    elif loop==1:
        temp_lis=[x-val for x in a]
        for k,y in enumerate(temp_lis):
            if y>0:


print c  



[1, 5, 7, 13, 17, 19]



Inspired by other posts, I had an other idea of pure python code that require prior ordering but that is symmetric to a and b


def myIterator4(a, b):
    curr = iter(a)
    next = iter(b)
        max = curr.next()
        tmp = next.next() #  Empty iterator if b is empty
        while True:
            yield max
            while tmp<=max:
                tmp = next.next()           
            max = tmp
            curr, next  = next, curr
    except StopIteration:

Returns empty iterator if a or b is empty.




I think you'll have a hard time applying numpy vectorization to this problem and maintain linear performance. This requires storing a fair bit of state: the current index in a, the current index in b, and the current threshold. numpy doesn't provide many stateful vectorized functions. In fact the only one I can think of off the top of my head is ufunc.reduce, which isn't well-suited to this problem -- though of course it could be shoehorned to work.


In fact, just after I posted this, my browser updated with an excellent vectorized solution. But that solution requires sorting, which is O(n log n); and indeed, after some testing, I see that the pure python solution below is faster for all inputs!

事实上,就在我发布这篇文章之后,我的浏览器更新了一个优秀的矢量化解决方案。但是这个解需要排序,也就是O(n log n)的确,经过一些测试,我发现下面的纯python解决方案对于所有输入都更快!

def interleave_monotonic(a, b):
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:

Note that if a is empty, this returns an empty iterable, but if b is empty, this returns a one-item iterable containing the first value of a. I think that's consistent with your example, but this is an edge case that I'm not certain about. Also, the numpy-based solution exhibits slightly different behavior, always beginning with the lesser of a[0] and b[0], while the above always begins with the first value of a, regardless. I modified the above to check for that for the following tests, which show pretty clearly that the pure python solution is the fastest.




def interleave_monotonic_np(a, b):
    ab = np.hstack((a, b))
    s = np.argsort(ab)
    t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
    return ab[s][np.concatenate(([True], t[1:] != t[:-1]))]

def interleave_monotonic(a, b):
    a, b = (a, b) if a[0] <= b[0] else (b, a)
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:

def interleave_monotonic_py(a, b):
    return numpy.fromiter(interleave_monotonic(a, b), dtype='int64')

def get_a_b(n):
    rnd = random.sample(xrange(10 ** 10), n * 2)
    return sorted(rnd[0::2]), sorted(rnd[1::2])



>>> for e in range(7):
...         a, b = get_a_b(10 ** e)
...         print (interleave_monotonic_np(a, b) == 
...                interleave_monotonic_py(a, b)).all()
...         %timeit interleave_monotonic_np(a, b)
...         %timeit interleave_monotonic_py(a, b)

10000 loops, best of 3: 85.6 us per loop
100000 loops, best of 3: 5.53 us per loop
10000 loops, best of 3: 91.7 us per loop
100000 loops, best of 3: 9.19 us per loop
10000 loops, best of 3: 144 us per loop
10000 loops, best of 3: 46.5 us per loop
1000 loops, best of 3: 711 us per loop
1000 loops, best of 3: 445 us per loop
100 loops, best of 3: 6.67 ms per loop
100 loops, best of 3: 4.42 ms per loop
10 loops, best of 3: 135 ms per loop
10 loops, best of 3: 55.7 ms per loop
1 loops, best of 3: 1.58 s per loop
1 loops, best of 3: 654 ms per loop



Vectorised solution (pedagogical style, easily understandable)

We can vectorise this by augmenting the arrays with a discriminator index, such that a is tagged 0 and b is tagged 1:


a_t = np.vstack((a, np.zeros_like(a)))
b_t = np.vstack((b, np.ones_like(b)))

Now, let's combine and sort:


c = np.hstack((a_t, b_t))[:, np.argsort(np.hstack((a, b)))]
array([[ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 13, 14, 15, 17, 19, 21, 23],
       [ 0,  0,  0,  0,  1,  1,  0,  0,  0,  0,  1,  1,  1,  0,  1,  1,  1]])

You can see that now the elements are in order but retaining their tags, so we can see which elements came from a and from b.


So, let's select the first element and each element where the tag changes from 0 (for a) to 1 (for b) and back again:

那么,让我们选择第一个元素和标记从0 (a)变为1 (b)的每个元素,然后再返回:

c[:, np.concatenate(([True], c[1, 1:] != c[1, :-1]))][0]
array([ 1,  5,  7, 13, 17, 19])

Efficient vectorised solution

You can do this slightly more efficiently by keeping the items and their tags in separate (but parallel) arrays:


ab = np.hstack((a, b))
s = np.argsort(ab)
t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
ab[s][np.concatenate(([True], t[1:] != t[:-1]))]
array([ 1,  5,  7, 13, 17, 19])

This is slightly more efficient than the above solution; I get an average of 45 as opposed to 90 microseconds, although your conditions may vary.




You could break the problem into two parts:


  1. Make a and b iterators which yield values only if they are bigger than some threshold.
  2. 使a和b迭代器只在值大于某个阈值时才产生值。
  3. Use itertools (essentially George Sakkis' roundrobin recipe) to alternate between the two iterators.
  4. 使用itertools(基本上是George Sakkis的roundrobin菜谱)在两个迭代器之间交替使用。

import itertools
import numpy as np

a = np.array([1,2,3,4,7,8,9,10,17])
b = np.array([5,6,13,14,15,19,21,23])

def takelarger(iterable):
    for x in iterable:
        if x > takelarger.threshold:
            takelarger.threshold = x
            yield x
takelarger.threshold = 0

def alternate(*iterables):
    # Modified version of George Sakkis' roundrobin recipe
    # http://docs.python.org/library/itertools.html#recipes
    pending = len(iterables)
    nexts = itertools.cycle(iter(it).next for it in iterables)
    while pending:
            for n in nexts:
                yield n()
        except StopIteration:
            # Unlike roundrobin, stop when any iterable has been consumed

def interleave(a, b):
    takelarger.threshold = 0
    return list(alternate(takelarger(a),takelarger(b)))




[1, 5, 7, 13, 17, 19]



a = [1,2,3,4,7,8,9,10,17]
b = [5,6,13,14,15,19,21,23]
c.append(a[0])  #add the first element to c
i=True          #breaking condition
loop=2          #initially we to choose b
while i:
    if loop==2:
        temp_lis=d([x-val for x in b])  #find the difference between every
                                        # element and the last element of c and
                                        # pick the smallest positive value.

        for k,y in enumerate(temp_lis):
            if y>0:
            i=False        #use for-else loop for determining the breaking condition

        loop=1   #change the loop to 1
    elif loop==1:
        temp_lis=[x-val for x in a]
        for k,y in enumerate(temp_lis):
            if y>0:


print c  



[1, 5, 7, 13, 17, 19]



Inspired by other posts, I had an other idea of pure python code that require prior ordering but that is symmetric to a and b


def myIterator4(a, b):
    curr = iter(a)
    next = iter(b)
        max = curr.next()
        tmp = next.next() #  Empty iterator if b is empty
        while True:
            yield max
            while tmp<=max:
                tmp = next.next()           
            max = tmp
            curr, next  = next, curr
    except StopIteration:

Returns empty iterator if a or b is empty.




I think you'll have a hard time applying numpy vectorization to this problem and maintain linear performance. This requires storing a fair bit of state: the current index in a, the current index in b, and the current threshold. numpy doesn't provide many stateful vectorized functions. In fact the only one I can think of off the top of my head is ufunc.reduce, which isn't well-suited to this problem -- though of course it could be shoehorned to work.


In fact, just after I posted this, my browser updated with an excellent vectorized solution. But that solution requires sorting, which is O(n log n); and indeed, after some testing, I see that the pure python solution below is faster for all inputs!

事实上,就在我发布这篇文章之后,我的浏览器更新了一个优秀的矢量化解决方案。但是这个解需要排序,也就是O(n log n)的确,经过一些测试,我发现下面的纯python解决方案对于所有输入都更快!

def interleave_monotonic(a, b):
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:

Note that if a is empty, this returns an empty iterable, but if b is empty, this returns a one-item iterable containing the first value of a. I think that's consistent with your example, but this is an edge case that I'm not certain about. Also, the numpy-based solution exhibits slightly different behavior, always beginning with the lesser of a[0] and b[0], while the above always begins with the first value of a, regardless. I modified the above to check for that for the following tests, which show pretty clearly that the pure python solution is the fastest.




def interleave_monotonic_np(a, b):
    ab = np.hstack((a, b))
    s = np.argsort(ab)
    t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
    return ab[s][np.concatenate(([True], t[1:] != t[:-1]))]

def interleave_monotonic(a, b):
    a, b = (a, b) if a[0] <= b[0] else (b, a)
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:

def interleave_monotonic_py(a, b):
    return numpy.fromiter(interleave_monotonic(a, b), dtype='int64')

def get_a_b(n):
    rnd = random.sample(xrange(10 ** 10), n * 2)
    return sorted(rnd[0::2]), sorted(rnd[1::2])



>>> for e in range(7):
...         a, b = get_a_b(10 ** e)
...         print (interleave_monotonic_np(a, b) == 
...                interleave_monotonic_py(a, b)).all()
...         %timeit interleave_monotonic_np(a, b)
...         %timeit interleave_monotonic_py(a, b)

10000 loops, best of 3: 85.6 us per loop
100000 loops, best of 3: 5.53 us per loop
10000 loops, best of 3: 91.7 us per loop
100000 loops, best of 3: 9.19 us per loop
10000 loops, best of 3: 144 us per loop
10000 loops, best of 3: 46.5 us per loop
1000 loops, best of 3: 711 us per loop
1000 loops, best of 3: 445 us per loop
100 loops, best of 3: 6.67 ms per loop
100 loops, best of 3: 4.42 ms per loop
10 loops, best of 3: 135 ms per loop
10 loops, best of 3: 55.7 ms per loop
1 loops, best of 3: 1.58 s per loop
1 loops, best of 3: 654 ms per loop