共享内存与R并行foreach

时间:2022-07-22 14:57:21

Problem Description:

问题描述:

I have a big matrix c, loaded in RAM memory. My goal is through parallel processing to have read only access to it. However when I create the connections either I use doSNOW, doMPI, big.matrix, etc the amount to ram used increases dramatically.

我有一个大矩阵c,加载到RAM内存中。我的目标是通过并行处理来只读访问它。然而,当我创建连接时,我使用doSNOW,doMPI,big.matrix等,使用ram的数量急剧增加。

Is there a way to properly create a shared memory, where all the processes may read from, without creating a local copy of all the data?

有没有办法正确创建共享内存,所有进程可以读取,而不创建所有数据的本地副本?

Example:

例:

libs<-function(libraries){# Installs missing libraries and then load them
  for (lib in libraries){
    if( !is.element(lib, .packages(all.available = TRUE)) ) {
      install.packages(lib)
    }
    library(lib,character.only = TRUE)
  }
}

libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)

#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
  #load bigmemory
  require(bigmemory)
  # attach the matrix via shared memory??
  m <- attach.big.matrix(mdesc)
  #dummy expression to test data aquisition
  c<-m[1,1]
}
closeAllConnections()

RAM: 共享内存与R并行foreach in the image above, you may find that the memory increases a lot until foreach ends and it is freed.

RAM:在上面的图像中,您可能会发现内存增加很多,直到foreach结束并且它被释放。

2 个解决方案

#1


12  

I think the solution to the problem can be seen from the post of Steve Weston, the author of the foreach package, here. There he states:

我认为这个问题的解决方案可以从foreach包的作者Steve Weston的帖子中看到。在那里他说:

The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.

doParallel包将自动导出变量到foreach循环中引用的worker。

So I think the problem is that in your code your big matrix c is referenced in the assignment c<-m[1,1]. Just try xyz <- m[1,1] instead and see what happens.

所以我认为问题是在你的代码中你的大矩阵c在赋值c <-m [1,1]中被引用。只需尝试xyz < - m [1,1],看看会发生什么。

Here is an example with a file-backed big.matrix:

以下是文件支持的big.matrix的示例:

#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double", 
                 separated = FALSE, 
                 backingfile = "example.bin", 
                 descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}

共享内存与R并行foreach

## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  invisible(c) ## c is referenced and thus exported to workers
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}
closeAllConnections()

共享内存与R并行foreach

#2


3  

Alternatively, if you are on Linux/Mac and you want a CoW shared memory, use forks. First load all your data into the main thread, and then launch working threads (forks) with general function mcparallel from the parallel package.

或者,如果您使用的是Linux / Mac并且需要CoW共享内存,请使用分叉。首先将所有数据加载到主线程中,然后从并行包启动具有通用函数mcparallel的工作线程(forks)。

You can collect their results with mccollect or with the use of truly shared memory using the Rdsm library, like this:

您可以使用mccollect或使用Rdsm库使用真正的共享内存来收集结果,如下所示:

library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number

job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23

You can confirm, that the value really gets updated in backgruound, if you delay the write:

您可以确认,如果您延迟写入,该值实际上会在backgruound中更新:

fn<-function()
{
  Sys.sleep(1) #One second delay
  shared[1]<-11
}

job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)

To control for concurency and avoid race conditions use locks:

要控制确定性并避免竞争条件,请使用锁定:

library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"

bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
}

good.incr<-function()
{
  lock(m)
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
  unlock(m)
}

shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions

mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6 

mccollect()

Edit:

编辑:

I simplified dependencies a bit by exchanging Rdsm::mgrmakevar into bigmemory::big.matrix. mgrmakevar internally calls big.matrix anyway, and we don't need anything more.

我通过将Rdsm :: mgrmakevar交换成bigmemory :: big.matrix来简化了依赖关系。无论如何,mgrmakevar内部调用big.matrix,我们不需要任何其他内容。

#1


12  

I think the solution to the problem can be seen from the post of Steve Weston, the author of the foreach package, here. There he states:

我认为这个问题的解决方案可以从foreach包的作者Steve Weston的帖子中看到。在那里他说:

The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.

doParallel包将自动导出变量到foreach循环中引用的worker。

So I think the problem is that in your code your big matrix c is referenced in the assignment c<-m[1,1]. Just try xyz <- m[1,1] instead and see what happens.

所以我认为问题是在你的代码中你的大矩阵c在赋值c <-m [1,1]中被引用。只需尝试xyz < - m [1,1],看看会发生什么。

Here is an example with a file-backed big.matrix:

以下是文件支持的big.matrix的示例:

#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double", 
                 separated = FALSE, 
                 backingfile = "example.bin", 
                 descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}

共享内存与R并行foreach

## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  invisible(c) ## c is referenced and thus exported to workers
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}
closeAllConnections()

共享内存与R并行foreach

#2


3  

Alternatively, if you are on Linux/Mac and you want a CoW shared memory, use forks. First load all your data into the main thread, and then launch working threads (forks) with general function mcparallel from the parallel package.

或者,如果您使用的是Linux / Mac并且需要CoW共享内存,请使用分叉。首先将所有数据加载到主线程中,然后从并行包启动具有通用函数mcparallel的工作线程(forks)。

You can collect their results with mccollect or with the use of truly shared memory using the Rdsm library, like this:

您可以使用mccollect或使用Rdsm库使用真正的共享内存来收集结果,如下所示:

library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number

job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23

You can confirm, that the value really gets updated in backgruound, if you delay the write:

您可以确认,如果您延迟写入,该值实际上会在backgruound中更新:

fn<-function()
{
  Sys.sleep(1) #One second delay
  shared[1]<-11
}

job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)

To control for concurency and avoid race conditions use locks:

要控制确定性并避免竞争条件,请使用锁定:

library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"

bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
}

good.incr<-function()
{
  lock(m)
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
  unlock(m)
}

shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions

mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6 

mccollect()

Edit:

编辑:

I simplified dependencies a bit by exchanging Rdsm::mgrmakevar into bigmemory::big.matrix. mgrmakevar internally calls big.matrix anyway, and we don't need anything more.

我通过将Rdsm :: mgrmakevar交换成bigmemory :: big.matrix来简化了依赖关系。无论如何,mgrmakevar内部调用big.matrix,我们不需要任何其他内容。