用doMC替换并联plyr

时间:2022-12-19 09:51:44

Consider a standard grouped operation on a data.frame:

考虑data.frame上的标准分组操作:

library(plyr)
library(doMC)
library(MASS) # for example

nc <- 12
registerDoMC(nc)

d <- data.frame(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

res <- ddply(d, .(g), function(d_group) {
   # slow, complicated operations on d_group
}, .parallel = FALSE)

It's trivial to take advantage of a multi-core setup by simply writing .parallel = TRUE instead. This is one of my favorite features of plyr.

通过简单地编写.parallel = TRUE来利用多核设置是微不足道的。这是我最喜欢的plyr功能之一。

But with plyr being deprecated (I think) and essentially replaced by dplyr, purrr, etc., the solution to parallel processing has become significantly more verbose:

但是,随着plyr被弃用(我认为)并且基本上被dplyr,purrr等取代,并行处理的解决方案变得更加冗长:

library(dplyr)
library(multidplyr)
library(parallel)
library(MASS) # for example

nc <- 12

d <- tibble(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

cl <- create_cluster(nc)
set_default_cluster(cl)
cluster_library(cl, packages = c("MASS"))
cluster_copy(cl, obj = y)

d_parts <- d %>% partition(g, cluster = cl)
res <- d_parts %>% collect() %>% ungroup()

rm(d_parts)
rm(cl)

You can imagine how long this example could get considering each package and object you need inside the loop needs its own cluster_* command to copy it onto the nodes. The non-parallelized plyr-to-dplyr translation is just a simple dplyr::group_by construction and it's unfortunate that there's no terse way to enable parallel processing on it. So, my questions are:

您可以想象这个示例可以在多长时间内考虑循环中需要的每个包和对象需要自己的cluster_ *命令将其复制到节点上。非并行化的plyr-to-dplyr转换只是一个简单的dplyr :: group_by构造,不幸的是,没有简洁的方法来启用并行处理。所以,我的问题是:

  • Is this actually the preferred way to translate my code from plyr to dplyr?
  • 这实际上是将我的代码从plyr转换为dplyr的首选方法吗?
  • What sort of magic is happening behind the scenes in plyr that makes it so easy to turn on parallel processing? Is there a reason this capability would be particularly difficult to add to dplyr and that's why it doesn't exist yet?
  • 在plyr的幕后发生了什么样的魔术,这使得开启并行处理变得如此容易?是否有理由将此功能添加到dplyr特别困难,这就是为什么它还不存在?
  • Are my two examples fundamentally different in terms of how the code is executed?
  • 我的两个例子在代码执行方面有根本的不同吗?

1 个解决方案

#1


3  

  1. I don't think there is one true 'prefered' way to translate {plyr} code to {dplyr}.

    我不认为将{plyr}代码转换为{dplyr}有一种真正的“首选”方法。

  2. In the comments @Aurèle did a better job than I ever could in describing the connection between {plyr} and {doMC}. One thing that happened is that the incentives changed a bit. {doMC} is from Revolution Analytics (since purchased by Microsoft). But Hadley, who developed dplyr, currently works at RStudio. These two companies compete in the IDE space. So, it is perhaps natural that their packages aren't designed to play well together. The only form of parallelism I've seen strong support for coming out of RStudio is {sparklyr}, which they've made relatively 'easy' to set up. But, I can't really recommend futzing with Spark to do parallel processing for a single machine.

    在评论中,@Aurèle在描述{plyr}和{doMC}之间的联系时做得比我做得更好。发生的一件事是激励措施有所改变。 {doMC}来自Revolution Analytics(自微软购买)。但是开发dplyr的Hadley目前在RStudio工作。这两家公司在IDE领域展开竞争。因此,他们的包装设计不能很好地结合在一起也许很自然。我已经看到对RStudio出来的强烈支持的唯一形式的并行性是{sparklyr},他们已经相对“容易”设置了。但是,我不能真正推荐使用Spark来为一台机器进行并行处理。

  3. @Aurèle again did a good job of explaining the execution differences. Your new code uses a PSOCK cluster and the old code used forks. Forks use a copy on write mode for accessing RAM, so parallel processes can start off with access to the same data immediately post fork. PSOCK clusters are like spawning new copies of R - they have to load libraries and receive an explicit copy of the data.

    @Aurèle再次在解释执行差异方面做得很好。您的新代码使用PSOCK群集和旧代码使用的分叉。 Forks在写入模式下使用副本来访问RAM,因此并行进程可以在fork之后立即访问相同的数据。 PSOCK集群就像产生R的新副本一样 - 它们必须加载库并接收数据的显式副本。

You can use a pattern like...

你可以使用像......这样的模式

library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"


split(d, d$g) %>% 
  map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
  map_df(~value(.x))

... with some finesse on the map_df step to do some parallel processing. Note that under {purrr} the ~ is anonymous function syntax where .x is the values that have been mapped in.

...在map_df步骤中有一些技巧,做一些并行处理。请注意,在{purrr}下,〜是匿名函数语法,其中.x是已映射的值。

If you like to live dangerously, you might be able to create a version of something similar without using {future} by using a private method in {purrr}

如果您喜欢危险地生活,您可以通过在{purrr}中使用私有方法来创建类似的版本而不使用{future}

mcmap <- function(.x, .f, ...) {
  .f <- as_mapper(.f, ...)
  mclapply(.x, function(.x) {
    force(.f)
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list")
  }) %>%
    map(~ .x[[1]])
}

#1


3  

  1. I don't think there is one true 'prefered' way to translate {plyr} code to {dplyr}.

    我不认为将{plyr}代码转换为{dplyr}有一种真正的“首选”方法。

  2. In the comments @Aurèle did a better job than I ever could in describing the connection between {plyr} and {doMC}. One thing that happened is that the incentives changed a bit. {doMC} is from Revolution Analytics (since purchased by Microsoft). But Hadley, who developed dplyr, currently works at RStudio. These two companies compete in the IDE space. So, it is perhaps natural that their packages aren't designed to play well together. The only form of parallelism I've seen strong support for coming out of RStudio is {sparklyr}, which they've made relatively 'easy' to set up. But, I can't really recommend futzing with Spark to do parallel processing for a single machine.

    在评论中,@Aurèle在描述{plyr}和{doMC}之间的联系时做得比我做得更好。发生的一件事是激励措施有所改变。 {doMC}来自Revolution Analytics(自微软购买)。但是开发dplyr的Hadley目前在RStudio工作。这两家公司在IDE领域展开竞争。因此,他们的包装设计不能很好地结合在一起也许很自然。我已经看到对RStudio出来的强烈支持的唯一形式的并行性是{sparklyr},他们已经相对“容易”设置了。但是,我不能真正推荐使用Spark来为一台机器进行并行处理。

  3. @Aurèle again did a good job of explaining the execution differences. Your new code uses a PSOCK cluster and the old code used forks. Forks use a copy on write mode for accessing RAM, so parallel processes can start off with access to the same data immediately post fork. PSOCK clusters are like spawning new copies of R - they have to load libraries and receive an explicit copy of the data.

    @Aurèle再次在解释执行差异方面做得很好。您的新代码使用PSOCK群集和旧代码使用的分叉。 Forks在写入模式下使用副本来访问RAM,因此并行进程可以在fork之后立即访问相同的数据。 PSOCK集群就像产生R的新副本一样 - 它们必须加载库并接收数据的显式副本。

You can use a pattern like...

你可以使用像......这样的模式

library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"


split(d, d$g) %>% 
  map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
  map_df(~value(.x))

... with some finesse on the map_df step to do some parallel processing. Note that under {purrr} the ~ is anonymous function syntax where .x is the values that have been mapped in.

...在map_df步骤中有一些技巧,做一些并行处理。请注意,在{purrr}下,〜是匿名函数语法,其中.x是已映射的值。

If you like to live dangerously, you might be able to create a version of something similar without using {future} by using a private method in {purrr}

如果您喜欢危险地生活,您可以通过在{purrr}中使用私有方法来创建类似的版本而不使用{future}

mcmap <- function(.x, .f, ...) {
  .f <- as_mapper(.f, ...)
  mclapply(.x, function(.x) {
    force(.f)
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list")
  }) %>%
    map(~ .x[[1]])
}