使用GCD并行处理数组

时间:2022-03-29 13:50:09

I have a large array that I would like to process by handing slices of it to a few asynchronous tasks. As a proof of concept, I have the written the following code:

我有一个很大的数组,我想通过将它的切片交给一些异步任务来处理。作为概念的证明,我写了以下代码:

class TestParallelArrayProcessing {
    let array: [Int]
    var summary: [Int]

    init() {
        array = Array<Int>(count: 500000, repeatedValue: 0)
        for i in 0 ..< 500000 {
            array[i] = Int(arc4random_uniform(10))
        }
        summary = Array<Int>(count: 10, repeatedValue: 0)
    }

    func calcSummary() {
        let group = dispatch_group_create()
        let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

        for i in 0 ..< 10 {
            dispatch_group_async(group, queue, {
                let base = i * 50000
                for x in base ..< base + 50000 {
                    self.summary[i] += self.array[x]
                }
            })
        }
        dispatch_group_notify(group, queue, {
            println(self.summary)
        })
    }
}

After init(), array will be initialized with random integers between 0 and 9.

init()之后,数组将用0到9之间的随机整数初始化。

The calcSummary function dispatches 10 tasks that take disjoint chunks of 50000 items from array and add them up, using their respective slot in summary as an accummulator.

calcSummary函数分派10个任务,这些任务从数组中分离出50000个条目,并将它们相加,并使用它们各自的汇总槽作为伏隔器。

This program crashes at the self.summary[i] += self.array[x] line. The error is:

这个程序崩溃了。摘要[我]+ =自我。array[x]。错误的是:

 EXC_BAD_INSTRUCTION (code = EXC_I386_INVOP).

I can see, in the debugger, that it has managed to iterate a few times before crashing, and that the variables, at the time of the crash, have values within correct bounds.

我可以看到,在调试器中,它在崩溃之前进行了几次迭代,并且在崩溃时,变量的值在正确的范围内。

I have read that EXC_I386_INVOP can happen when trying to access an object that has already been released. I wonder if this has anything to do with Swift making a copy of the array if it is modified, and, if so, how to avoid it.

我已经阅读了EXC_I386_INVOP在尝试访问已经释放的对象时可能发生。我想知道这是否与Swift复制的数组有关,如果它被修改了,如果是的话,如何避免它。

4 个解决方案

#1


5  

This is a slightly different take on the approach in @Eduardo's answer, using the Array type's withUnsafeMutableBufferPointer<R>(body: (inout UnsafeMutableBufferPointer<T>) -> R) -> R method. That method's documentation states:

这与@Eduardo的回答中的方法略有不同,使用数组类型的withUnsafeMutableBufferPointer (body: inout UnsafeMutableBufferPointer ) -> R方法。该方法的文档:

Call body(p), where p is a pointer to the Array's mutable contiguous storage. If no such storage exists, it is first created.

调用body(p),其中p是指向数组的可变连续存储的指针。如果不存在这样的存储,则首先创建它。

Often, the optimizer can eliminate bounds- and uniqueness-checks within an array algorithm, but when that fails, invoking the same algorithm on body's argument lets you trade safety for speed.

通常,优化器可以在数组算法中消除边界和惟一的检查,但是当失败时,调用body参数上的相同算法可以让您以安全换取速度。

That second paragraph seems to be exactly what's happening here, so using this method might be more "idiomatic" in Swift, whatever that means:

第二段似乎就是这里正在发生的事情,所以使用这个方法在Swift中可能更“惯用”,不管它意味着什么:

func calcSummary() {
    let group = dispatch_group_create()
    let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

    self.summary.withUnsafeMutableBufferPointer {
        summaryMem -> Void in
        for i in 0 ..< 10 {
            dispatch_group_async(group, queue, {
                let base = i * 50000
                for x in base ..< base + 50000 {
                    summaryMem[i] += self.array[x]
                }
            })
        }
    }

    dispatch_group_notify(group, queue, {
        println(self.summary)
    })
}

#2


3  

When you use the += operator, the LHS is an inout parameter -- I think you're getting race conditions when, as you mention in your update, Swift moves around the array for optimization. I was able to get it to work by summing the chunk in a local variable, then simply assigning to the right index in summary:

当您使用+=运算符时,LHS是一个inout参数——我认为您正在获得竞态条件,正如您在更新中提到的,Swift在数组中移动以进行优化。我可以通过在局部变量中对数据块求和,然后简单地在摘要中赋值给正确的索引来实现:

func calcSummary() {
    let group =  dispatch_group_create()
    let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

    for i in 0 ..< 10 {
        dispatch_group_async(group, queue, {
            let base = i * 50000
            var sum = 0
            for x in base ..< base + 50000 {
                sum += self.array[x]
            }
            self.summary[i] = sum
        })
    }

    dispatch_group_notify(group, queue, {
        println(self.summary)
    })
}

#3


2  

I think Nate is right: there are race conditions with the summary variable. To fix it, I used summary's memory directly:

我认为内特是对的:有比赛条件的摘要变量。为了解决这个问题,我直接使用了summary的内存:

func calcSummary() {
    let group = dispatch_group_create()
    let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

    let summaryMem = UnsafeMutableBufferPointer<Int>(start: &summary, count: 10)

    for i in 0 ..< 10 {
        dispatch_group_async(group, queue, {
           let base = i * 50000
           for x in base ..< base + 50000 {
              summaryMem[i] += self.array[x]
           }
        })
    }

    dispatch_group_notify(group, queue, {
        println(self.summary)
    })
}

This works (so far).

这是(到目前为止)。

EDIT Mike S has a very good point, in his comment below. I have also found this blog post, which sheds some light on the problem.

编辑Mike S有一个很好的观点,在他的评论下面。我也发现了这篇博客文章,它揭示了这个问题。

#4


2  

You can also use concurrentPerform(iterations: Int, execute work: (Int) -> Swift.Void) (since Swift 3).

您还可以使用concurrentperformance(迭代:Int,执行工作:(Int) -> swif. void)(因为Swift 3)。

It has a much simpler syntax:

它有一个更简单的语法:

DispatchQueue.concurrentPerform(iterations: iterations) {i in
        performOperation(i)
}

and will wait for all threads to finalise before returning.

并将等待所有线程最终完成后再返回。

#1


5  

This is a slightly different take on the approach in @Eduardo's answer, using the Array type's withUnsafeMutableBufferPointer<R>(body: (inout UnsafeMutableBufferPointer<T>) -> R) -> R method. That method's documentation states:

这与@Eduardo的回答中的方法略有不同,使用数组类型的withUnsafeMutableBufferPointer (body: inout UnsafeMutableBufferPointer ) -> R方法。该方法的文档:

Call body(p), where p is a pointer to the Array's mutable contiguous storage. If no such storage exists, it is first created.

调用body(p),其中p是指向数组的可变连续存储的指针。如果不存在这样的存储,则首先创建它。

Often, the optimizer can eliminate bounds- and uniqueness-checks within an array algorithm, but when that fails, invoking the same algorithm on body's argument lets you trade safety for speed.

通常,优化器可以在数组算法中消除边界和惟一的检查,但是当失败时,调用body参数上的相同算法可以让您以安全换取速度。

That second paragraph seems to be exactly what's happening here, so using this method might be more "idiomatic" in Swift, whatever that means:

第二段似乎就是这里正在发生的事情,所以使用这个方法在Swift中可能更“惯用”,不管它意味着什么:

func calcSummary() {
    let group = dispatch_group_create()
    let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

    self.summary.withUnsafeMutableBufferPointer {
        summaryMem -> Void in
        for i in 0 ..< 10 {
            dispatch_group_async(group, queue, {
                let base = i * 50000
                for x in base ..< base + 50000 {
                    summaryMem[i] += self.array[x]
                }
            })
        }
    }

    dispatch_group_notify(group, queue, {
        println(self.summary)
    })
}

#2


3  

When you use the += operator, the LHS is an inout parameter -- I think you're getting race conditions when, as you mention in your update, Swift moves around the array for optimization. I was able to get it to work by summing the chunk in a local variable, then simply assigning to the right index in summary:

当您使用+=运算符时,LHS是一个inout参数——我认为您正在获得竞态条件,正如您在更新中提到的,Swift在数组中移动以进行优化。我可以通过在局部变量中对数据块求和,然后简单地在摘要中赋值给正确的索引来实现:

func calcSummary() {
    let group =  dispatch_group_create()
    let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

    for i in 0 ..< 10 {
        dispatch_group_async(group, queue, {
            let base = i * 50000
            var sum = 0
            for x in base ..< base + 50000 {
                sum += self.array[x]
            }
            self.summary[i] = sum
        })
    }

    dispatch_group_notify(group, queue, {
        println(self.summary)
    })
}

#3


2  

I think Nate is right: there are race conditions with the summary variable. To fix it, I used summary's memory directly:

我认为内特是对的:有比赛条件的摘要变量。为了解决这个问题,我直接使用了summary的内存:

func calcSummary() {
    let group = dispatch_group_create()
    let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)

    let summaryMem = UnsafeMutableBufferPointer<Int>(start: &summary, count: 10)

    for i in 0 ..< 10 {
        dispatch_group_async(group, queue, {
           let base = i * 50000
           for x in base ..< base + 50000 {
              summaryMem[i] += self.array[x]
           }
        })
    }

    dispatch_group_notify(group, queue, {
        println(self.summary)
    })
}

This works (so far).

这是(到目前为止)。

EDIT Mike S has a very good point, in his comment below. I have also found this blog post, which sheds some light on the problem.

编辑Mike S有一个很好的观点,在他的评论下面。我也发现了这篇博客文章,它揭示了这个问题。

#4


2  

You can also use concurrentPerform(iterations: Int, execute work: (Int) -> Swift.Void) (since Swift 3).

您还可以使用concurrentperformance(迭代:Int,执行工作:(Int) -> swif. void)(因为Swift 3)。

It has a much simpler syntax:

它有一个更简单的语法:

DispatchQueue.concurrentPerform(iterations: iterations) {i in
        performOperation(i)
}

and will wait for all threads to finalise before returning.

并将等待所有线程最终完成后再返回。