安装OPENMPI
由于是实验,也不进行多机的配置了,只在虚拟机里安装吧。多个机器的配置可以参考此文
最简单的方法,apt安装
1
|
sudo apt-get install libcr-dev mpich2 mpich2-doc
|
测试
hello.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/* C Example */
#include <mpi.h>
#include <stdio.h>
int main ( int argc, char * argv[])
{
int rank, size;
MPI_Init (&argc, &argv); /* starts MPI */
MPI_Comm_rank (MPI_COMM_WORLD, &rank); /* get current process id */
MPI_Comm_size (MPI_COMM_WORLD, &size); /* get number of processes */
printf ( "Hello world from process %d of %d\n" , rank, size );
MPI_Finalize();
return 0;
}
|
编译运行及显示结果
1
2
3
4
|
mpicc mpi_hello.c -o hello
mpirun -np 2 ./hello
Hello world from process 0 of 2
Hello world from process 1 of 2
|
正常出现结果表明没有问题,
看下openmpi的版本
1
|
mpirun --version
|
1
2
|
mpirun (Open MPI) 1.6.5
Report bugs to http://www.open-mpi.org/community/help/
|
MPI计算矩阵乘法
通过opemMPI加速矩阵乘法运算。采用主从模式,0号是master,其他是child(或者叫worker,as you wish)。
基本思路
两个矩阵A,B进行乘法运算,则A的行 i 乘以B的列 j 得出的数是新矩阵(i,j)坐标的数值。A(MN) B(NK)最后矩阵是M*K的,实验中M=N=K=1000,我也就没有明确区分MNK,全部用MATRIX_SIZE定义的。
最简单的思路就是每个worker分配(MATRIX_SIZE/(numprocess-1))个,然后如果有余下的,就分给余数对应的worker。比如MATRIX_SIZE=10,numprocess=4 则实际的worker有3个,每个人分3行,最后的一行给id是1的。可以很简单的利用循环类分配。最后Master收集所有的结果,并按照顺序组装起来就行。
每个worker的工作就是接收来自master的一行,和B矩阵运算,得出新一行的结果,然后发送回master
代码
多加了很多注释来解释,函数的说明下一节解释下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
#include <mpi.h>
#include <stdio.h>
#define MATRIX_SIZE 10
#define FROM_MASTER 1 //这里的类型可以区分消息的种类,以便区分worker发送来的结果
#define FROM_CHILD 2
#define MASTER 0
MPI_Status status;
int myid,numprocess;
//最终保存的结果
int ans [MATRIX_SIZE*MATRIX_SIZE];
int A[MATRIX_SIZE*MATRIX_SIZE],B[MATRIX_SIZE*MATRIX_SIZE];
//读取文件,注意读取文件要放在master里,不然会读两遍,出现错误
void readFile(){
FILE * fina,*finb;
fina= fopen ( "a.txt" , "r" );
int i;
for (i = 0; i < MATRIX_SIZE*MATRIX_SIZE ; ++i)
{
fscanf (fina, "%d " ,&A[i]);
}
fclose (fina);
finb= fopen ( "b.txt" , "r" );
for (i=0;i<MATRIX_SIZE*MATRIX_SIZE;i++)
fscanf (finb, "%d " ,&B[i]);
fclose (finb);
printf ( "read file ok\n" );
}
int master(){
int workid,dest,i,j;
printf ( "numprocess %d\n" ,numprocess );
//给每个worker发送B矩阵过去
for (i=0;i<numprocess-1;i++){
//send B matrix
MPI_Send(&B,MATRIX_SIZE*MATRIX_SIZE,MPI_INT,i+1,FROM_MASTER,MPI_COMM_WORLD);
}
//开始给每个worker分配任务,取模即可
for (i = 0; i < MATRIX_SIZE; i++)
{
//attention: num of workers is numprocess-1
workid=i%(numprocess-1)+1;
//send single line in A
MPI_Send(&A[i*MATRIX_SIZE],MATRIX_SIZE,MPI_INT,workid,FROM_MASTER,MPI_COMM_WORLD);
}
//等待从worker发送来的数据
int tempLine[MATRIX_SIZE];
for (i = 0; i < MATRIX_SIZE*MATRIX_SIZE; i++)
{
ans[i]=0;
}
for (i = 0; i < MATRIX_SIZE; ++i)
{
int myprocess=i%(numprocess-1)+1;
printf ( "Master is waiting %d\n" ,myprocess);
//receive every line from every process
MPI_Recv(&tempLine,MATRIX_SIZE,MPI_INT,myprocess,FROM_CHILD,MPI_COMM_WORLD,&status);
//发送过来的都是计算好了的一行的数据,直接组装到ans里就行
for (j=0;j<MATRIX_SIZE;j++){
ans[MATRIX_SIZE*i+j]=tempLine[j];
}
printf ( "Master gets %d\n" ,i);
}
for (i=0;i<MATRIX_SIZE*MATRIX_SIZE;i++){
printf ( "%d " ,ans[i] );
if (i%MATRIX_SIZE==(MATRIX_SIZE-1)) printf ( "\n" );
}
printf ( "The Master is out\n" );
}
int worker(){
int mA[MATRIX_SIZE],mB[MATRIX_SIZE*MATRIX_SIZE],mC[MATRIX_SIZE];
int i,j,bi;
MPI_Recv(&mB,MATRIX_SIZE*MATRIX_SIZE,MPI_INT,MASTER,FROM_MASTER,MPI_COMM_WORLD,&status);
//接收来自master的A的行
for (i=0;i<MATRIX_SIZE/(numprocess-1);i++){
MPI_Recv(&mA,MATRIX_SIZE,MPI_INT,MASTER,FROM_MASTER,MPI_COMM_WORLD,&status);
//矩阵乘法,A 的一行和B矩阵相乘
for (bi=0;bi<MATRIX_SIZE;bi++){
mC[bi]=0;
for (j=0;j<MATRIX_SIZE;j++){
mC[bi]+=mA[j]*mB[bi*MATRIX_SIZE+j];
}
}
MPI_Send(&mC,MATRIX_SIZE,MPI_INT,MASTER,FROM_CHILD,MPI_COMM_WORLD);
}
//如果处于余数范围内,则需要多计算一行
if (MATRIX_SIZE%(numprocess-1)!=0){
if (myid<=(MATRIX_SIZE%(numprocess-1)))
{
MPI_Recv(&mA,MATRIX_SIZE,MPI_INT,MASTER,FROM_MASTER,MPI_COMM_WORLD,&status);
for (bi=0;bi<MATRIX_SIZE;bi++){
mC[bi]=0;
for (j=0;j<MATRIX_SIZE;j++){
mC[bi]+=mA[j]*mB[bi*MATRIX_SIZE+j];
}
}
MPI_Send(&mC,MATRIX_SIZE,MPI_INT,MASTER,FROM_CHILD,MPI_COMM_WORLD);
}
}
printf ( "The worker %d is out\n" ,myid);
}
int main( int argc, char **argv)
{
MPI_Init (&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);
MPI_Comm_size(MPI_COMM_WORLD,&numprocess);
if (myid==MASTER){
readFile();
master();
}
if (myid>MASTER){
worker();
}
MPI_Finalize();
return 0;
}
|
OPENMPI简单函数介绍
针对实验用到的几个函数进行说明。
MPI为程序员提供一个并行环境库,程序员通过调用MPI的库程序来达到程序员所要达到的并行目的,可以只使用其中的6个最基本的函数就能编写一个完整的MPI程序去求解很多问题。这6个基本函数,包括启动和结束MPI环境,识别进程以及发送和接收消息:
理论上说,MPI所有的通信功能可以用它的六个基本的调用来实现:
- MPI_INIT 启动MPI环境
- MPI_COMM_SIZE 确定进程数
- MPI_COMM_RANK 确定自己的进程标识符
- MPI_SEND 发送一条消息
- MPI_RECV 接收一条消息
- MPI_FINALIZE 结束MPI环境
初始化和结束
MPI初始化:通过MPI_Init函数进入MPI环境并完成所有的初始化工作。
1
|
int MPI_Init( int *argc, char * * * argv )
|
MPI结束:通过MPI_Finalize函数从MPI环境中退出。
1
|
int MPI_Finalize( void )
|
获取进程的编号
调用MPI_Comm_rank函数获得当前进程在指定通信域中的编号,将自身与其他程序区分。
1
|
int MPI_Comm_rank(MPI_Comm comm, int *rank)
|
获取指定通信域的进程数
调用MPI_Comm_size函数获取指定通信域的进程个数,确定自身完成任务比例。
1
|
int MPI_Comm_size(MPI_Comm comm, int *size)
|
MPI消息
一个消息好比一封信
消息的内容的内容即信的内容,在MPI中成为消息缓冲(Message Buffer)
消息的接收发送者即信的地址,在MPI中成为消息封装(Message Envelop)
MPI中,消息缓冲由三元组<起始地址,数据个数,数据类型>标识
消息信封由三元组<源/目标进程,消息标签,通信域>标识
消息发送
MPI_Send函数用于发送一个消息到目标进程。
1
|
int MPI_Send( void *buf, int count, MPI_Datatype dataytpe, int dest, int tag, MPI_Comm comm)
|
buf是要发送数据的指针,比如一个A数组,可以直接&A,count则是数据长度,datatype都要改成MPI的type。dest就是worker的id了。tag则可以通过不同的type来区分消息类型,比如是master发送的还是worker发送的。
消息接收
MPI_Recv函数用于从指定进程接收一个消息
1
|
int MPI_Recv( void *buf, int count, MPI_Datatype datatyepe, int source, int tag, MPI_Comm comm, MPI_Status *status)
|
编译和执行
生成执行文件data
1
|
mpicc -o programname programname.c
|
一个MPI并行程序由若干个并发进程组成,这些进程可以相同也可以不同。MPI只支持静态进程创建,即:每个进程在执行前必须在MPI环境中登记,且它们必须一起启动。通常启动可执行的MPI程序是通过命令行来实现的。启动方法由具体实现确定。例如在MPICH实现中通过下列命令行可同时在独立的机器上启动相同的可执行程序:
1
|
mpirun –np N programname
|
其中N是同时运行的进程的个数,programname是可执行的MPI程序的程序名。