在R中并行化一个矢量化函数最简单的方法是什么?

时间:2021-10-05 21:20:45

I have a very large list X and a vectorized function f. I want to calculate f(X), but this will take a long time if I do it with a single core. I have (access to) a 48-core server. What is the easiest way to parallelize the calculation of f(X)? The following is not the right answer:

我有一个很大的列表X和一个矢量化的函数f,我想计算f(X)但是如果我用一个核来做的话,这将花费很长时间。我有一个48核的服务器。并行计算f(X)最简单的方法是什么?以下不是正确答案:

library(foreach)
library(doMC)
registerDoMC()

foreach(x=X, .combine=c) %dopar% f(x)

The above code will indeed parallelize the calculation of f(X), but it will do so by applying f separately to every element of X. This ignores the vectorized nature of f and will probably make things slower as a result, not faster. Rather than applying f elementwise to X, I want to split X into reasonably-sized chunks and apply f to those.

上面的代码确实将f(X)的计算并行化,但它将分别对X的每个元素应用f。这忽略了f的矢量化性质,可能会使计算速度变慢,而不是变快。与其把f元素应用到X上,我想把X分割成合理大小的块,并将f应用于这些。

So, should I just manually split X into 48 equal-sized sublists and then apply f to each in parallel, then manually put together the result? Or is there a package designed for this?

那么,我应该手动将X分割成48个大小相等的子列表,然后并行地应用f,然后手工将结果组合起来吗?或者有专门设计的包装吗?

In case anyone is wondering, my specific use case is here.

如果有人想知道,我的特定用例在这里。

5 个解决方案

#1


2  

The itertools package was designed to address this kind of problem. In this case, I would use isplitVector:

itertools包的设计目的是解决这类问题。在这种情况下,我会使用isplitVector:

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

For this example, pvec is undoubtably faster and simpler, but this can be used on Windows with the doParallel package, for example.

对于这个例子,pvec无疑更快更简单,但是它可以在Windows上使用doParallel包。

#2


5  

Although this is an older question this might be interesting for everyone who stumbled upon this via google (like me): Have a look at the pvec function in the multicore package. I think it does exactly what you want.

虽然这是一个较老的问题,但是对于每个通过谷歌(像我)来发现这个问题的人来说,这可能会很有趣:看看多核包中的pvec函数。我想这正是你想要的。

#3


4  

Here's my implementation. It's a function chunkmap that takes a vectorized function, a list of arguments that should be vectorized, and a list of arguments that should not be vectorized (i.e. constants), and returns the same result as calling the function on the arguments directly, except that the result is calculated in parallel. For a function f, vector arguments v1, v2, v3, and scalar arguments s1, s2, the following should return identical results:

这是我的实现。它是一个函数chunkmap,接受一个矢量化的函数,一个应该矢量化的参数列表,一个不应该矢量化的参数列表(例如常量),并返回与直接调用参数上的函数相同的结果,除非结果是并行计算的。对于一个函数f,向量参数v1, v2, v3,和标量参数s1, s2,下面应该返回相同的结果:

f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

Since it is impossible for the chunkapply function to know which arguments of f are vectorized and which are not, it is up to you to specify when you call it, or else you will get the wrong results. You should generally name your arguments to ensure that they get bound correctly.

由于chunkapply函数不可能知道f的哪些参数是矢量化的,而哪些不是,所以您可以指定何时调用它,否则您将得到错误的结果。通常应该为参数命名,以确保它们得到正确的绑定。

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

Here are some examples showing that chunkapply(f,list(x)) produces identical results to f(x). I have set the max.chunk.size extremely small to ensure that the chunking algorithm is actually used.

这里有一些例子表明chunkapply(f,list(x))与f(x)产生了相同的结果。我设置了max.chunk。尺寸非常小,以确保实际使用了分块算法。

> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

If anyone has a better name than "chunkapply", please suggest it.

如果有人有比“chunkapply”更好的名字,请提出建议。

Edit:

As another answer points out, there is a function called pvec in the multicore pacakge that has very similar functionality to what I have written. For simple cases, you should us that, and you should vote up Jonas Rauch's answer for it. However, my function is a bit more general, so if any of the following apply to you, you might want to consider using my function instead:

正如另一个答案所指出的,在多核pacakge中有一个名为pvec的函数,它的功能与我所写的非常相似。对于简单的情况,你应该这样做,你应该投票赞成乔纳斯·劳赫的答案。但是,我的函数更一般一些,所以如果下列任何一个适用于您,您可能需要考虑使用我的函数:

  • You need to use a parallel backend other than multicore (e.g. MPI). My function uses foreach, so you can use any parallelization framework that provides a backend for foreach.
  • 您需要使用一个并行后端,而不是多核(例如MPI)。我的函数使用foreach,所以您可以使用任何为foreach提供后端的并行化框架。
  • You need to pass multiple vectorized arguments. pvec only vectorizes over a single argument, so you couldn't easily implement parallel vectorized addition with pvec, for example. My function allows you to specify arbitrary arguments.
  • 您需要传递多个向量化的参数。pvec仅对一个参数进行矢量化,因此不能轻松地使用pvec实现并行的矢量化加法,例如。我的函数允许你指定任意的参数。

#4


0  

Map-Reduce might be what you're looking for; it's been ported to R

Map-Reduce可能是您正在寻找的;它被移植到R

#5


0  

How about something like this? R will take advantage of all the available memory and multicore will parallelize over all available cores.

像这样的东西怎么样?R将利用所有可用内存,多核将在所有可用内核上并行运行。

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)

#1


2  

The itertools package was designed to address this kind of problem. In this case, I would use isplitVector:

itertools包的设计目的是解决这类问题。在这种情况下,我会使用isplitVector:

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

For this example, pvec is undoubtably faster and simpler, but this can be used on Windows with the doParallel package, for example.

对于这个例子,pvec无疑更快更简单,但是它可以在Windows上使用doParallel包。

#2


5  

Although this is an older question this might be interesting for everyone who stumbled upon this via google (like me): Have a look at the pvec function in the multicore package. I think it does exactly what you want.

虽然这是一个较老的问题,但是对于每个通过谷歌(像我)来发现这个问题的人来说,这可能会很有趣:看看多核包中的pvec函数。我想这正是你想要的。

#3


4  

Here's my implementation. It's a function chunkmap that takes a vectorized function, a list of arguments that should be vectorized, and a list of arguments that should not be vectorized (i.e. constants), and returns the same result as calling the function on the arguments directly, except that the result is calculated in parallel. For a function f, vector arguments v1, v2, v3, and scalar arguments s1, s2, the following should return identical results:

这是我的实现。它是一个函数chunkmap,接受一个矢量化的函数,一个应该矢量化的参数列表,一个不应该矢量化的参数列表(例如常量),并返回与直接调用参数上的函数相同的结果,除非结果是并行计算的。对于一个函数f,向量参数v1, v2, v3,和标量参数s1, s2,下面应该返回相同的结果:

f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

Since it is impossible for the chunkapply function to know which arguments of f are vectorized and which are not, it is up to you to specify when you call it, or else you will get the wrong results. You should generally name your arguments to ensure that they get bound correctly.

由于chunkapply函数不可能知道f的哪些参数是矢量化的,而哪些不是,所以您可以指定何时调用它,否则您将得到错误的结果。通常应该为参数命名,以确保它们得到正确的绑定。

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

Here are some examples showing that chunkapply(f,list(x)) produces identical results to f(x). I have set the max.chunk.size extremely small to ensure that the chunking algorithm is actually used.

这里有一些例子表明chunkapply(f,list(x))与f(x)产生了相同的结果。我设置了max.chunk。尺寸非常小,以确保实际使用了分块算法。

> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

If anyone has a better name than "chunkapply", please suggest it.

如果有人有比“chunkapply”更好的名字,请提出建议。

Edit:

As another answer points out, there is a function called pvec in the multicore pacakge that has very similar functionality to what I have written. For simple cases, you should us that, and you should vote up Jonas Rauch's answer for it. However, my function is a bit more general, so if any of the following apply to you, you might want to consider using my function instead:

正如另一个答案所指出的,在多核pacakge中有一个名为pvec的函数,它的功能与我所写的非常相似。对于简单的情况,你应该这样做,你应该投票赞成乔纳斯·劳赫的答案。但是,我的函数更一般一些,所以如果下列任何一个适用于您,您可能需要考虑使用我的函数:

  • You need to use a parallel backend other than multicore (e.g. MPI). My function uses foreach, so you can use any parallelization framework that provides a backend for foreach.
  • 您需要使用一个并行后端,而不是多核(例如MPI)。我的函数使用foreach,所以您可以使用任何为foreach提供后端的并行化框架。
  • You need to pass multiple vectorized arguments. pvec only vectorizes over a single argument, so you couldn't easily implement parallel vectorized addition with pvec, for example. My function allows you to specify arbitrary arguments.
  • 您需要传递多个向量化的参数。pvec仅对一个参数进行矢量化,因此不能轻松地使用pvec实现并行的矢量化加法,例如。我的函数允许你指定任意的参数。

#4


0  

Map-Reduce might be what you're looking for; it's been ported to R

Map-Reduce可能是您正在寻找的;它被移植到R

#5


0  

How about something like this? R will take advantage of all the available memory and multicore will parallelize over all available cores.

像这样的东西怎么样?R将利用所有可用内存,多核将在所有可用内核上并行运行。

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)