如何并行化R中的模拟?

时间:2022-06-15 14:56:33

Could anybody help me to somehow parallelize this code ? I am about to make some simulations but I STACKED... it takes too long - I even left my computer for 3 days and it did not finished calculating

有人可以帮我以某种方式并行化这段代码吗?我即将进行一些模拟,但是我堆叠了...它需要太长时间 - 我甚至离开了我的电脑3天并没有完成计算

sapply(1:1000, Take_expected_value, points =10^7, number_of_trajectories = 10^7)


Take_expected_value <- function(interval_end = 1, points = 100, number_of_trajectories = 1000){
   return(
      mean(
         exp(
            replicate(
               n = number_of_trajectories,
               expr = Max_from_Wiener_on_interval(interval_end, points)
               )
         )
      )
   ) # This function just replicates max_from_... function, then put values
      # to exp function, and calculates mean of all replications.

}



Max_from_Wiener_on_interval <- function(interval_end = 1, points = 100){

   # time increment
   Delta <- interval_end/points

   # time moments
   time <- seq( 0, interval_end, length = points + 1)

   # Wiener process
   W <-  cumsum( sqrt(Delta) * rnorm( points + 1 ) )
   # return max of "Wiener * sqrt(2) - time moment"
   return(
      max(sqrt(2) * W - time)
      )
}

EDIT

编辑

After EDIT I am using this code, but it might be a problem of my weak machine(computer). Still it is very slow for me:

编辑后我使用此代码,但它可能是我的弱机器(计算机)的问题。对我来说仍然很慢:

Take_expected_value <- function(interval_end = 1, points = 100, number_of_trajectories = 1000){
   return(
      mean(
         exp(
            replicate(
               n = number_of_trajectories,
               expr = Max_from_Wiener_on_interval(interval_end, points)
            )
         )
      )
   ) # This function just replicates max_from_... function, then put values
   # to exp function, and calculates mean of all replications.

}


# this function shall not be exported
Max_from_Wiener_on_interval <- function(interval_end = 1, points = 100){

   # time increment
   Delta <- interval_end/points

   # time moments
   time <- seq( 0, interval_end, length = points + 1)

   # Wiener process
   W <-  cumsum( sqrt(Delta) * rnorm( points + 1 ) )
   # return max of "Wiener * sqrt(2) - time moment"
   return(
      max(sqrt(2) * W - time)
   )
}


install.packages("snowfall")
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))
sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time(
   sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^6, number_of_trajectories = 10^6)
)
sfRemoveAll()
sfStop()

1 个解决方案

#1


1  

I tend to use snowfall, but there are many other ways to parallelize a function. Here is a generic script with a junk function meant to take a while to compute:

我倾向于使用降雪,但还有许多其他方法来并行化功能。这是一个带有垃圾函数的通用脚本,需要一段时间来计算:

Iter_vals=as.list(c(1:16)) # the values to iterate the function with

fx_parallel_run=function(Iter_val, multiplier){ #junk function with 2 arguments 
  jnk=round(runif(1)*multiplier)
  jnk1=runif(jnk)
  for (i in 1:length(jnk1)){
    jnk1[i]=(jnk1[i]*runif(1))+Iter_val[[1]]
  }
  return(jnk1) 
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS')) #by default snowfall will use the total number of processors, so this is not necessary. 
#However, if you are using the machine for other purposes, 
#you can adapt this line to leave at least a core or two free 
#so the computer is still functional for multi tasking.
sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
results=sfLapply(Iter_vals,fun=fx_parallel_run, multiplier=800) #extra function arguments go after the first two sfLapply arguments
sfRemoveAll()
sfStop()

in your case, after specifying the functions, I would simply use:

在你的情况下,在指定函数后,我会简单地使用:

require(snowfall)
sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
results=sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^7, number_of_trajectories = 10^7)
sfRemoveAll()
sfStop()

This may need a little tweaking but I am not going to do all of the work for you.

这可能需要一些调整,但我不打算为你做所有的工作。

#1


1  

I tend to use snowfall, but there are many other ways to parallelize a function. Here is a generic script with a junk function meant to take a while to compute:

我倾向于使用降雪,但还有许多其他方法来并行化功能。这是一个带有垃圾函数的通用脚本,需要一段时间来计算:

Iter_vals=as.list(c(1:16)) # the values to iterate the function with

fx_parallel_run=function(Iter_val, multiplier){ #junk function with 2 arguments 
  jnk=round(runif(1)*multiplier)
  jnk1=runif(jnk)
  for (i in 1:length(jnk1)){
    jnk1[i]=(jnk1[i]*runif(1))+Iter_val[[1]]
  }
  return(jnk1) 
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS')) #by default snowfall will use the total number of processors, so this is not necessary. 
#However, if you are using the machine for other purposes, 
#you can adapt this line to leave at least a core or two free 
#so the computer is still functional for multi tasking.
sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
results=sfLapply(Iter_vals,fun=fx_parallel_run, multiplier=800) #extra function arguments go after the first two sfLapply arguments
sfRemoveAll()
sfStop()

in your case, after specifying the functions, I would simply use:

在你的情况下,在指定函数后,我会简单地使用:

require(snowfall)
sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
results=sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^7, number_of_trajectories = 10^7)
sfRemoveAll()
sfStop()

This may need a little tweaking but I am not going to do all of the work for you.

这可能需要一些调整,但我不打算为你做所有的工作。