R语言使用 multicore 包进行并行计算

时间:2023-03-08 19:49:04

  R语言是单线程的,如果数据量比较大的情况下最好用并行计算来处理数据,这样会获得运行速度倍数的提升。这里介绍一个基于Unix系统的并行程序包:multicore.

  我们用三种不同的方式来进行一个简单的数据处理:

  我们从 1000 genome project 数据库下载了VCF文件,现在需要手动提取出每个allele的 allele frequency(AF)值(vcftools 可以很好的解决这个问题,但是假设我的vcf文件没有genotype, 或者我要实现一些个性化功能,那么可能要手动解决)。我们仅仅提取VCF文件的前1,000,000行作测试。看一下文件前几行:

##fileformat=VCFv4.
##INFO=<ID=LDAF,Number=,Type=Float,Description="MLE Allele Frequency Accounting for LD">
##INFO=<ID=AVGPOST,Number=,Type=Float,Description="Average posterior probability from MaCH/Thunder">
##INFO=<ID=RSQ,Number=,Type=Float,Description="Genotype imputation quality from MaCH/Thunder">
##INFO=<ID=ERATE,Number=,Type=Float,Description="Per-marker Mutation rate from MaCH/Thunder">
##INFO=<ID=THETA,Number=,Type=Float,Description="Per-marker Transition rate from MaCH/Thunder">
##INFO=<ID=CIEND,Number=,Type=Integer,Description="Confidence interval around END for imprecise variants">
##INFO=<ID=CIPOS,Number=,Type=Integer,Description="Confidence interval around POS for imprecise variants">
##INFO=<ID=END,Number=,Type=Integer,Description="End position of the variant described in this record">
##INFO=<ID=HOMLEN,Number=.,Type=Integer,Description="Length of base pair identical micro-homology at event breakpoints">
##INFO=<ID=HOMSEQ,Number=.,Type=String,Description="Sequence of base pair identical micro-homology at event breakpoints">
##INFO=<ID=SVLEN,Number=,Type=Integer,Description="Difference in length between REF and ALT alleles">
##INFO=<ID=SVTYPE,Number=,Type=String,Description="Type of structural variant">
##INFO=<ID=AC,Number=.,Type=Integer,Description="Alternate Allele Count">
##INFO=<ID=AN,Number=,Type=Integer,Description="Total Allele Count">
##ALT=<ID=DEL,Description="Deletion">
##FORMAT=<ID=GT,Number=,Type=String,Description="Genotype">
##FORMAT=<ID=DS,Number=,Type=Float,Description="Genotype dosage from MaCH/Thunder">
##FORMAT=<ID=GL,Number=.,Type=Float,Description="Genotype Likelihoods">
##INFO=<ID=AA,Number=,Type=String,Description="Ancestral Allele, ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/pilot_data/technical/reference/ancestral_alignments/README">
##INFO=<ID=AF,Number=,Type=Float,Description="Global Allele Frequency based on AC/AN">
##INFO=<ID=AMR_AF,Number=,Type=Float,Description="Allele Frequency for samples from AMR based on AC/AN">
##INFO=<ID=ASN_AF,Number=,Type=Float,Description="Allele Frequency for samples from ASN based on AC/AN">
##INFO=<ID=AFR_AF,Number=,Type=Float,Description="Allele Frequency for samples from AFR based on AC/AN">
##INFO=<ID=EUR_AF,Number=,Type=Float,Description="Allele Frequency for samples from EUR based on AC/AN">
##INFO=<ID=VT,Number=,Type=String,Description="indicates what type of variant the line represents">
##INFO=<ID=SNPSOURCE,Number=.,Type=String,Description="indicates if a snp was called when analysing the low coverage or exome alignment data">
##reference=GRCh37
#CHROM POS ID REF ALT QUAL FILTER INFO
rs58108140 G A PASS AVGPOST=0.7707;RSQ=0.4319;LDAF=0.2327;ERATE=0.0161;AN=;VT=SNP;AA=.;THETA=0.0046;AC=;SNPSOURCE=LOWCOV;AF=0.14;ASN_AF=0.13;AMR_AF=0.17;AFR_AF=0.04;EUR_AF=0.21
rs189107123 C G PASS AN=;THETA=0.0077;VT=SNP;AA=.;AC=;ERATE=0.0048;SNPSOURCE=LOWCOV;AVGPOST=0.9330;LDAF=0.0479;RSQ=0.3475;AF=0.02;ASN_AF=0.01;AMR_AF=0.03;AFR_AF=0.01;EUR_AF=0.02
rs180734498 C T PASS THETA=0.0048;AN=;AC=;VT=SNP;AA=.;RSQ=0.6281;LDAF=0.1573;SNPSOURCE=LOWCOV;AVGPOST=0.8895;ERATE=0.0058;AF=0.11;ASN_AF=0.02;AMR_AF=0.08;AFR_AF=0.21;EUR_AF=0.14
rs144762171 G C PASS AVGPOST=0.9698;AN=;VT=SNP;AA=.;RSQ=0.6482;AC=;SNPSOURCE=LOWCOV;ERATE=0.0012;LDAF=0.0359;THETA=0.0204;AF=0.03;ASN_AF=0.02;AMR_AF=0.03;AFR_AF=0.02;EUR_AF=0.04
rs201747181 TC T PASS AA=TC;AC=;AF=0.02;AFR_AF=0.02;AMR_AF=0.02;AN=;ASN_AF=0.01;AVGPOST=0.8711;ERATE=0.0065;EUR_AF=0.02;LDAF=0.0788;RSQ=0.2501;THETA=0.0100;VT=INDEL
rs151276478 T C PASS AN=;AC=;ERATE=0.0034;THETA=0.0139;RSQ=0.3603;LDAF=0.0525;VT=SNP;AA=.;AVGPOST=0.9221;SNPSOURCE=LOWCOV;AF=0.02;ASN_AF=0.02;AMR_AF=0.02;AFR_AF=0.01;EUR_AF=0.02
rs140337953 G T PASS AC=;AA=T;AN=;RSQ=0.5481;VT=SNP;THETA=0.0162;SNPSOURCE=LOWCOV;ERATE=0.0183;LDAF=0.6576;AVGPOST=0.7335;AF=0.73;ASN_AF=0.89;AMR_AF=0.80;AFR_AF=0.48;EUR_AF=0.73
rs199681827 C CTGT PASS AA=.;AC=;AF=0.0037;AFR_AF=0.01;AN=;ASN_AF=0.0017;AVGPOST=0.8325;ERATE=0.0072;LDAF=0.0903;RSQ=0.0960;THETA=0.0121;VT=INDEL
rs200430748 G GA PASS AA=G;AC=;AF=0.01;AFR_AF=0.06;AMR_AF=0.0028;AN=;AVGPOST=0.9041;ERATE=0.0041;LDAF=0.0628;RSQ=0.2883;THETA=0.0153;VT=INDEL

  可以发现AF值在INFO这一列字符串里面,所以要取出这个值是比较容易的,只需要对字符串进行切割即可。

  先通过data.table包读入数据到计算机内存,我们看到data.table包读入数据非常快,1,000,000行数据读入仅仅5秒。同时函数非常智能的通过后面的格式将前28行识别为注释,没有读入。

library(data.table)
head_vcf <- fread("head_1000000.vcf", sep = "\t", colClasses=list(character= ,))
### Read 1000000 rows and 8 (of 8) columns from 0.166 GB file in 00:00:05
head(head_vcf)
  #CHROM   POS          ID REF ALT QUAL FILTER
1: 1 10583 rs58108140 G A 100 PASS
2: 1 10611 rs189107123 C G 100 PASS
3: 1 13302 rs180734498 C T 100 PASS
4: 1 13327 rs144762171 G C 100 PASS
5: 1 13957 rs201747181 TC T 28 PASS
6: 1 13980 rs151276478 T C 100 PASS
INFO
1: AVGPOST=0.7707;RSQ=0.4319;LDAF=0.2327;ERATE=0.0161;AN=2184;VT=SNP;AA=.;THETA=0.0046;AC=314;SNPSOURCE=LOWCOV;AF=0.14;ASN_AF=0.13;AMR_AF=0.17;AFR_AF=0.04;EUR_AF=0.21
2: AN=2184;THETA=0.0077;VT=SNP;AA=.;AC=41;ERATE=0.0048;SNPSOURCE=LOWCOV;AVGPOST=0.9330;LDAF=0.0479;RSQ=0.3475;AF=0.02;ASN_AF=0.01;AMR_AF=0.03;AFR_AF=0.01;EUR_AF=0.02
3: THETA=0.0048;AN=2184;AC=249;VT=SNP;AA=.;RSQ=0.6281;LDAF=0.1573;SNPSOURCE=LOWCOV;AVGPOST=0.8895;ERATE=0.0058;AF=0.11;ASN_AF=0.02;AMR_AF=0.08;AFR_AF=0.21;EUR_AF=0.14
4: AVGPOST=0.9698;AN=2184;VT=SNP;AA=.;RSQ=0.6482;AC=59;SNPSOURCE=LOWCOV;ERATE=0.0012;LDAF=0.0359;THETA=0.0204;AF=0.03;ASN_AF=0.02;AMR_AF=0.03;AFR_AF=0.02;EUR_AF=0.04
5: AA=TC;AC=35;AF=0.02;AFR_AF=0.02;AMR_AF=0.02;AN=2184;ASN_AF=0.01;AVGPOST=0.8711;ERATE=0.0065;EUR_AF=0.02;LDAF=0.0788;RSQ=0.2501;THETA=0.0100;VT=INDEL
6: AN=2184;AC=45;ERATE=0.0034;THETA=0.0139;RSQ=0.3603;LDAF=0.0525;VT=SNP;AA=.;AVGPOST=0.9221;SNPSOURCE=LOWCOV;AF=0.02;ASN_AF=0.02;AMR_AF=0.02;AFR_AF=0.01;EUR_AF=0.02

  第一种方法是使用内建函数,如果使用单核处理数据,那么这是最推荐的方式,内建函数都是用底层语言写好封装并优化的,运算速度非常快。

#编写一个函数,函数的参数info_str是读入文件的第八列(INFO),这是一个字符串向量
get_af_fun1 <- function(info_str)
{
split1 <- strsplit(info_str, ";AF=")
str1 <- vector(length = length(split1))
for(i in :length(str1)) str1[i] <- split1[[i]][]
split2 <- strsplit(str1, ";")
str2 <- vector(length = length(split2))
for(i in :length(str2)) str2[i] <- split2[[i]][]
str2
}
system.time(result1 <- get_af_fun1(as.character(head_vcf$INFO)))
  用户   系统   流逝
38.944 0.012 38.950 #因为是根据位置索引操作,所以随机抽取三行发现结果一致
> result1[43525]
[1] "0.01"
> head_vcf[43525,]
#CHROM POS ID REF ALT QUAL FILTER
1: 1 3607355 rs3765729 G A 100 PASS
INFO
1: AA=G;AN=2184;RSQ=0.7704;VT=SNP;AC=22;LDAF=0.0126;SNPSOURCE=LOWCOV;AVGPOST=0.9929;THETA=0.0020;ERATE=0.0006;AF=0.01;ASN_AF=0.03;AMR_AF=0.02 > head_vcf[654635,]
#CHROM POS ID REF ALT QUAL FILTER
1: 1 49595556 rs192800899 G T 100 PASS
INFO
1: AA=G;AN=2184;RSQ=0.8739;AC=6;VT=SNP;THETA=0.0006;LDAF=0.0030;SNPSOURCE=LOWCOV;ERATE=0.0003;AVGPOST=0.9992;AF=0.0027;AMR_AF=0.0028;EUR_AF=0.01
> result1[654635]
[1] "0.0027"
> head_vcf[9876,]
#CHROM POS ID REF ALT QUAL FILTER
1: 1 1322735 rs146506266 G A 100 PASS
INFO
1: RSQ=0.9882;AVGPOST=1.0000;THETA=0.0004;SNPSOURCE=LOWCOV,EXOME;AA=G;AN=2184;VT=SNP;LDAF=0.0019;AC=4;ERATE=0.0003;AF=0.0018;ASN_AF=0.01
> result1[9876]
[1] "0.0018"

  R语言使用 multicore 包进行并行计算

  通过第一个例子我们看到,通过向量化运算和内建函数处理一百万行数据仅仅花费40秒不到,而且对于for循环,预先分配内存会极大的提高速度。主要原因是R语言的数据赋值是拷贝而不是引用,因此如果预先对向量分配内存就避免了每次循环都对该向量重新赋值。同时我们看到在程序是单核在计算。

  第二个例子是利用R语言内建循环函数lapply(), 算法和第一个例子相似,只不过把for循环用lapply执行。

fun_str1 <- function(str_v) strsplit(str_v, ";AF=")[[]][]
fun_str2 <- function(str_v) strsplit(str_v, ";")[[]][] get_af_fun2 <- function(info_str)
{
str1 <- unlist(lapply(info_str, fun_str1))
str2 <- unlist(lapply(str1, fun_str2))
str2
}
system.time(result2 <- get_af_fun2(as.character(head_vcf$INFO)))
用户 系统 流逝
76.512 0.028 76.556 #随机抽取三行发现结果一致 > result2[543654]
[1] "0.0009"
> result2[75676]
[1] "0.0014"
> result2[8765]
[1] "0.0018"
> head_vcf[543654]
#CHROM POS ID REF ALT QUAL FILTER
1: 1 40941743 rs143883355 G A 100 PASS
INFO
1: AN=2184;THETA=0.0005;LDAF=0.0010;VT=SNP;AA=.;AVGPOST=0.9998;RSQ=0.9135;SNPSOURCE=LOWCOV;ERATE=0.0003;AC=2;AF=0.0009;AFR_AF=0.0041
> head_vcf[75676]
#CHROM POS ID REF ALT QUAL FILTER
1: 1 5569843 rs148207486 C T 100 PASS
INFO
1: ERATE=0.0004;RSQ=0.7694;AA=C;AN=2184;VT=SNP;LDAF=0.0019;AVGPOST=0.9989;THETA=0.0012;SNPSOURCE=LOWCOV;AC=3;AF=0.0014;AFR_AF=0.01
> head_vcf[8765]
#CHROM POS ID REF ALT QUAL FILTER
1: 1 1254001 rs190286788 G C 100 PASS
INFO
1: ERATE=0.0005;RSQ=0.5499;AA=G;AN=2184;AVGPOST=0.9979;VT=SNP;THETA=0.0006;SNPSOURCE=LOWCOV;AC=4;LDAF=0.0017;AF=0.0018;AMR_AF=0.0028;EUR_AF=0.0040

 R语言使用 multicore 包进行并行计算

  第二个例子我们可以看出,计算这一百万行运行时间大概70多秒,lapply是代替循环的一个非常好的函数, 这里之所以运行速度比第一种方法慢,我们从程序可以看出,是因为对这个长度为一百万的字符串向量分别调用了一百万次fun_str1和fun_str2。程序在函数调用上花费了一定时间,如果将我们自己写的fun_str1, fun_str2函数换成内建函数,速度也会提高不少。

  

  有了第二个例子的铺垫,我们再来介绍第三个例子,也就是利用multicore 包进行并行处理。这个包是基于Unix系统的,这里简单介绍里面一个函数mclapply(), 和lapply()函数用法基本一致。具体看例子:

fun_str1 <- function(str_v) strsplit(str_v, ";AF=")[[]][]
fun_str2 <- function(str_v) strsplit(str_v, ";")[[]][] get_af_fun3 <- function(info_str)
{
str1 <- unlist(mclapply(info_str, fun_str1, mc.cores = ))
str2 <- unlist(mclapply(str1, fun_str2, mc.cores = ))
str2
}
system.time(result3 <- get_af_fun3(as.character(head_vcf$INFO)))
  用户   系统   流逝
56.816 65.512 39.958 #检查三次运行的结果显示三个结果向量完全一致
> all(result1 == result2)
[1] TRUE
> all(result3 == result2)
[1] TRUE

R语言使用 multicore 包进行并行计算

  查看CPU使用状态我们可以看到通过mc.cores调用四个核进行并行运算,确实比单核的lapply运算速度提高不少,但是没有四倍的效果是并行运算系统中一些线程活动的额外开销。

  再次加大运算量,将文件提高到10,000,000行, 为了避免宕机,这里请用大内存机器进行实验。

head_vcf <- fread("head_10000000.vcf", sep = "\t", colClasses=list(character= 1,6))
Read 10000000 rows and 8 (of 8) columns from 1.660 GB file in 00:02:04

  方案一,使用内建函数和for循环来处理。

> system.time(result1 <- get_af_fun1(as.character(head_vcf$INFO)))
用户 系统 流逝
603.936 0.248 604.060

  方案二,使用lapply()函数单核处理。

> system.time(result2 <- get_af_fun2(as.character(head_vcf$INFO)))
用户 系统 流逝
933.176 0.344 933.115

  方案三,使用mclapply()函数并行处理。

> system.time(result3 <- get_af_fun3(as.character(head_vcf$INFO)))
用户 系统 流逝
984.936 251.176 351.119

  这个一千万行的例子我们看出,系统内建函数的运行效率还是比lapply快, 开四个核并行的速度最快,而且这个效率还会随着计算核心数的提升而进一步获得较大的提升,有条件的用户可以选择方案三。