Could anybody help me to somehow parallelize this code ? I am about to make some simulations but I STACKED... it takes too long - I even left my computer for 3 days and it did not finished calculating
有人可以帮我以某种方式并行化这段代码吗?我即将进行一些模拟,但是我堆叠了...它需要太长时间 - 我甚至离开了我的电脑3天并没有完成计算
sapply(1:1000, Take_expected_value, points =10^7, number_of_trajectories = 10^7)
Take_expected_value <- function(interval_end = 1, points = 100, number_of_trajectories = 1000){
return(
mean(
exp(
replicate(
n = number_of_trajectories,
expr = Max_from_Wiener_on_interval(interval_end, points)
)
)
)
) # This function just replicates max_from_... function, then put values
# to exp function, and calculates mean of all replications.
}
Max_from_Wiener_on_interval <- function(interval_end = 1, points = 100){
# time increment
Delta <- interval_end/points
# time moments
time <- seq( 0, interval_end, length = points + 1)
# Wiener process
W <- cumsum( sqrt(Delta) * rnorm( points + 1 ) )
# return max of "Wiener * sqrt(2) - time moment"
return(
max(sqrt(2) * W - time)
)
}
EDIT
编辑
After EDIT I am using this code, but it might be a problem of my weak machine(computer). Still it is very slow for me:
编辑后我使用此代码,但它可能是我的弱机器(计算机)的问题。对我来说仍然很慢:
Take_expected_value <- function(interval_end = 1, points = 100, number_of_trajectories = 1000){
return(
mean(
exp(
replicate(
n = number_of_trajectories,
expr = Max_from_Wiener_on_interval(interval_end, points)
)
)
)
) # This function just replicates max_from_... function, then put values
# to exp function, and calculates mean of all replications.
}
# this function shall not be exported
Max_from_Wiener_on_interval <- function(interval_end = 1, points = 100){
# time increment
Delta <- interval_end/points
# time moments
time <- seq( 0, interval_end, length = points + 1)
# Wiener process
W <- cumsum( sqrt(Delta) * rnorm( points + 1 ) )
# return max of "Wiener * sqrt(2) - time moment"
return(
max(sqrt(2) * W - time)
)
}
install.packages("snowfall")
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time(
sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^6, number_of_trajectories = 10^6)
)
sfRemoveAll()
sfStop()
1 个解决方案
#1
1
I tend to use snowfall, but there are many other ways to parallelize a function. Here is a generic script with a junk function meant to take a while to compute:
我倾向于使用降雪,但还有许多其他方法来并行化功能。这是一个带有垃圾函数的通用脚本,需要一段时间来计算:
Iter_vals=as.list(c(1:16)) # the values to iterate the function with
fx_parallel_run=function(Iter_val, multiplier){ #junk function with 2 arguments
jnk=round(runif(1)*multiplier)
jnk1=runif(jnk)
for (i in 1:length(jnk1)){
jnk1[i]=(jnk1[i]*runif(1))+Iter_val[[1]]
}
return(jnk1)
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS')) #by default snowfall will use the total number of processors, so this is not necessary.
#However, if you are using the machine for other purposes,
#you can adapt this line to leave at least a core or two free
#so the computer is still functional for multi tasking.
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
results=sfLapply(Iter_vals,fun=fx_parallel_run, multiplier=800) #extra function arguments go after the first two sfLapply arguments
sfRemoveAll()
sfStop()
in your case, after specifying the functions, I would simply use:
在你的情况下,在指定函数后,我会简单地使用:
require(snowfall)
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
results=sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^7, number_of_trajectories = 10^7)
sfRemoveAll()
sfStop()
This may need a little tweaking but I am not going to do all of the work for you.
这可能需要一些调整,但我不打算为你做所有的工作。
#1
1
I tend to use snowfall, but there are many other ways to parallelize a function. Here is a generic script with a junk function meant to take a while to compute:
我倾向于使用降雪,但还有许多其他方法来并行化功能。这是一个带有垃圾函数的通用脚本,需要一段时间来计算:
Iter_vals=as.list(c(1:16)) # the values to iterate the function with
fx_parallel_run=function(Iter_val, multiplier){ #junk function with 2 arguments
jnk=round(runif(1)*multiplier)
jnk1=runif(jnk)
for (i in 1:length(jnk1)){
jnk1[i]=(jnk1[i]*runif(1))+Iter_val[[1]]
}
return(jnk1)
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS')) #by default snowfall will use the total number of processors, so this is not necessary.
#However, if you are using the machine for other purposes,
#you can adapt this line to leave at least a core or two free
#so the computer is still functional for multi tasking.
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
results=sfLapply(Iter_vals,fun=fx_parallel_run, multiplier=800) #extra function arguments go after the first two sfLapply arguments
sfRemoveAll()
sfStop()
in your case, after specifying the functions, I would simply use:
在你的情况下,在指定函数后,我会简单地使用:
require(snowfall)
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
results=sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^7, number_of_trajectories = 10^7)
sfRemoveAll()
sfStop()
This may need a little tweaking but I am not going to do all of the work for you.
这可能需要一些调整,但我不打算为你做所有的工作。