R的并行计算
一般简单的例子都可以在百度上找到了,稍微深入点的就要多尝试下了:
这里用a,b两个向量做测试,先定义两个向量,计算b向量是不是在a向量里,是的话就是1,否则就是0。主要是测试编写函数和传递参数。
# 单线程
a <- 1:5000
b <- round(runif(50000, 100, 10000),0)
func2 <- function(i){
num <- b[i]
if(num %in% a){
return(1)
}
}
results <- NULL
system.time({
for(i in 1:length(b)){
results <- c(results,func2(i))
}
})
用户 系统 流逝
9.11 0.00 9.11
并行计算的会稍微不一样一点,需要初始化核心集群,然后把变量或者包传过去。
# 并行版本
func3 <- function(i){
num <- b[i]
if(num %in% a){
return(c(num, 1))
}else{
return(c(num, 0))
}
}
results2 <- NULL
system.time({
x <- 1:1000
cl <- makeCluster(4) # 初始化四核心集群
# 对每个核初始化数据,否则会提示找不到对象
# 通常用法:clusterEvalQ包,例如clusterEvalQ(cl, sqldf)
# clusterExport变量,例如clusterExport(cl, "a")
# 严格来说clusterEvalQ可以语句块,如下所示,详见帮助文档
# 后面会附标准的用法
clusterEvalQ(cl, {
a <- 1:5000
set.seed(100)
b <- round(runif(50000, 100, 10000),0)
})
results2 <- parLapply(cl,x,func3) # lapply的并行版本
<- do.call('rbind',results2) # 整合结果
stopCluster(cl) # 关闭集群
})
# 还得重新定义下变量
a <- 1:5000
set.seed(100)
b <- round(runif(50000, 100, 10000),0)
finalResult <- (matrix(unlist(results2), ncol=2, byrow=T))
# 检查是否算错
finalResult <- transform(finalResult,
Correct = ifelse(finalResult$X1 %in% a, 1, 0))
errReusult <- finalResult[finalResult$Result != finalResult$Correct, ]
nrow(errReusult) #没有错误的行
用户 系统 流逝
0.00 0.02 1.06
用clusterExport传递变量
a <- 1:5000
set.seed(100)
b <- round(runif(50000, 100, 10000),0)
results2 <- NULL
system.time({
x <- 1:1000
cl <- makeCluster(4)
clusterExport(cl, "a")
clusterExport(cl, "b")
results2 <- parLapply(cl,x,func3)
<- do.call('c',results2)
stopCluster(cl)
})
用户 系统 流逝
0.00 0.04 1.07
并行版本明显还是快很多哈。