在云计算和集群环境中使用CUDA

MPI的使用

Posted by Memory on April 13, 2017

使用MPI使CUDA扩展至使用上千个节点,NVIDIA的GPUDirect技术加速了MPI发送(MPI_send)和(MPI_Receive)等关键操作。通过MPI个GPUDirect的使用,利用这些API创建云计算在计算集群上运行的程序。
MPI消息传递接口,定义了并行计算的拓扑结构,用来将一组进场组织成一个MPI会话,MPI会话的尺寸在整个程序执行生命周期中是固定不变的。 MPI中所有的并行都是显示的,编程者需要负责制定正确的并行度,并实现MPI程序构建中的并行算法。

MPI通信器

通信器是一个分布式对象,用于完成集中式和点对点的通信,集中式通信是指与制定通信组中所有处理器有关的MPI函数,点对点通信用于单个MPI进程之间的传递。
点对点通信 —单个进程对单个进程的通信

– Blocking(阻塞) :一个函数须等待操作完成才返回,返回后用户可以重新使用所占用的资源

– Non-blocking(非阻塞):一个函数不必等待操作完成便可返回    

完成两个不同MPI之间的通信,其中一个进程进行发送操作,另一个进程进行对应的读操作。MPI保证所有的消息都可以完整无误的送达对端。 MPI_Send()和MPI——Recv()通常采用暂停方式处理两个MPI进程直接的消息传递。 暂停意味着发送消息的进程将一直等待信息被完整准确发送后再返回;而接受消息的进程也将一直等待消息被完整正确的接收后再返回。
默认情况下,在调用MPI_init() 后,MPI会立刻创建MPI_COMM_WORLD通信器。 MPI_COMM_WORLD包含了程序中所有的MPI进程。
一个MPI程序可以创建多个相互独立的通信器,用来独立处理一组任务相关的消息,或从一组进程到另一组相关进程之间的通信。

组通信—是一个通信域中的所有进程都参加的全局通信操作

群集通信,按照通信方向的不同,又可以分为三种:一对多通信,多对一通信和多对多通信

  • 一对多通信:一个进程向其它所有的进程发送消息,这个负责发送消息的进程叫做Root进程

  • 多对一通信:一个进程负责从其它所有的进程接收消息,这个接收的进程也叫做Root进程

  • 多对多通信:每一个进程都向其它所有的进程发送或者接收消息

远程存储访问

  • 远程存储访问即直接对非本地的存储空间进行访问
  • 远程存储的访问 窗口是具体的实现形式 通过窗口操作实现来实现单边通信,通过对窗口的管理操作来实现对窗口操作的同步控制

MPI进程号

在通信器内,每个线程在其初始化后,系统都会分配一个唯一的整数识别码。 进程号也称为“任务ID”,用于条件判断操作,用来控制MPI线程的执行。 MPI程序通常选用进程号为0的进程作为主进程,通过主进程来控制其它进程号大于0的进程。 MPI程序结构图通常如下图所示:

主从模式

MPI的通用设计模式是主从模式,进程号为0的被定义为主进程,该进程用来管理其它所有进程的活动。下面是构建主从模式的MPI代码

MPI_Comm_size(MPI_COMM_WORLD.&numtasks);
MPI_Comm_rank(MPI_COMM_WORLD.&rank);
if(rank == 0) {
   //此处为主进程代码
}else {
   //此次为从进程代码
}

MPI样例程序

#include <stdio.h>
#include <mpi.h>

main( int argc, char *argv[] )
{
	int myRank,numProcs;

	MPI_Init(&argc, &argv);
	MPI_Comm_rank(MPI_COMM_WORLD, &myRank);//获取进程ID
	MPI_Comm_size(MPI_COMM_WORLD, &numProcs);//获取进程数
	printf("Process %d of %d say:Hello World!\n", myRank, numProcs);
	MPI_Finalize();
}

编译和运行方式
mpic++ helloWord.cpp -o helloWord (生成可执行程序)
mpirun -f machines -np 4 ./helloWord

  • machines:配置文件,指定节点列表

  • np:指定启动的进程数

MPI常用API

点对点通信
**MPI_Send **

int MPI_Send(void* buf, int count, MPI_Datatype  datatype, int dest, int tag, MPI _Comm comm);

- IN buf    		发送缓冲区的起始地址
- IN count 		要发送信息的元素个数
- IN datatype	发送信息的数据类型
- IN dest 		目标进程的rank
- IN tag		消息标签
- IN comm 	通信域

将发送缓冲区中的count个datatype数据类型的数据发送到目的进程

**MPI_Recv **

int MPI_Recv(void* buf, int count, MPI_Datatype datatype,int source, int tag, 
	MPI _Comm comm,MPI_Status * status);

-IN buf    		接收缓冲区的起始地址

-IN count 		要接收信息的元素个数

-IN datatype	接收信息的数据类型

-IN source 	接收数据的来源

-IN tag		消息标签

-IN comm 	通信域

-IN status 	包含实际接收到的消息的有关信息  

组通信
**MPI_Bcast **

int MPI_Bcast(void* buffer,int count,MPI_Datatype datatype,int root, MPI_Comm comm)    
-IN/OUT buffer	 通信消息缓冲区的起始地址   
-IN count 		将广播出去/或接收的数据个数   
-IN datatype 	广播/接收数据的数据类型     
-IN root 		广播数据的根进程的标识号   
-IN comm 	通信域   

**MPI_Gather **

int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype,void* recvbuf, int recvcount,
	MPI_Datatype recvtype,int root, MPI_Comm comm)  
-IN sendbuf 	发送消息缓冲区的起始地址   
-IN sendcount	发送消息缓冲区中的数据个数   
-IN sendtype 	发送消息缓冲区中的数据类型   
-OUT recvbuf 	接收消息缓冲区的起始地址  
-IN recvcount 	待接收的元素个数(仅对于根进程有意义) 
-IN recvtype 	接收元素的数据类型(仅对于根进程有意义)   
-IN root 		接收进程的序列号   
-IN comm 	通信域   

远程存储访问 MPI_Win_create 创建窗口

int MPI_Win_create(void * base, MPI_Aint size, int disp_unit, MPI_Info info, 
		MPI_Comm comm,MPI_Win * win) 
- IN base		 窗口空间的初始地址   
- IN size 		以字节为单位的窗口空间大小   
- IN disp_unit 	一个偏移单位对应的字节数   
- IN comm 		通信域    
- IN info 		传递给运行时的信息  
- OUT win 		返回的窗口对象   

创建窗口是一个组调用
**MPI_Win_free ** 释放窗口

  
MPI_Win_free(MPI_Win*win)

- INOUT win 窗口对象 输入为要释放的窗口 返回为空 

释放窗口是一个组调用

**MPI_Put 向窗口写 **

int MPI_Put(void * origin_addr, int origin_count, MPI_Datatype origin_datatype,int target_rank, 
	MPI_Aint target_disp, int target_count, MPI_Datatype  target_datatype, MPI_Win win)
- IN origin_addr 		本地发送缓冲区起始地址   
- IN origin_count 		本地发送缓冲区中将要写到窗口内的数据个数   
- IN origin_datatype 		本地发送缓冲区中的数据类型   
- IN target_rank 		目标进程标识   
- IN target_disp 		相对于写窗口起始地址的偏移单位 从该位置开始写   
- IN target_count 		以指定的数据类型为单位 写入窗口的数据的个数   
- IN target_datatype 		写数据的数据类型   
- IN win 			窗口对象   

MPI_Get 从窗口读

int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, 
	MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) 
- OUT origin_addr 		本地接收缓冲区的起始地址   
- IN origin_count 		以指定的数据类型为单位 接收数据的个数   
- IN origin_			datatype 接收数据的数据类型   
- IN target_rank 		将要读的窗口所在的进程标识   
- IN target_disp 		读取位置相对于窗口起始地址偏移单位的个数   
- IN target_count 		以指定的数据类型为单位 读取数据的个数   
- IN target_datatype 		读取数据的数据类型   
- IN win 窗口对象   

下面是一个mpi结合opencv进行文件读写,数据传输的程序

        #include<opencv2/opencv.hpp>   
        #include<mpi.h>       
	using namespace cv;       
	int MPI_OpenCV_Cuda_DEMO(int argc, char* argv[])     
	{
		int myid, numprocs;
		int namelen;
		char processor_name[MPI_MAX_PROCESSOR_NAME];
		MPI_Init(&argc, &argv);
		MPI_Comm_rank(MPI_COMM_WORLD, &myid);    //进程编号
		MPI_Comm_size(MPI_COMM_WORLD, &numprocs);//numprocs进程数
		MPI_Get_processor_name(processor_name, &namelen);
		MPI_Win window; //创建一个窗口
		int winSize = 0;
		if (myid == 0)//主节点进行数据读入    
		{
			Mat image = imread("./image.jpg",0);  
	        //通过opencv的函数imread读取图片确定所需开辟远地窗口的大小   
			winSize = image.rows*image.cols*sizeof(float);
	        //开辟远地窗口时需要设定窗口大小。这里设置0号节点winSize的大小,其他节点仍然是0大小   
		}
		//开辟远地数据窗口   
		float *matWin=new float[512*512*5] ; //一般给一个足够大的值  
		MPI_Win_create(matWin, winSize, sizeof(MPI_FLOAT), MPI_INFO_NULL, MPI_COMM_WORLD, &window);
	
		if (myid == 0) //确定主进程的工作
		{  
			Mat image = imread("./image.jpg", 0);//这是opencv的图片读取操作          
			image.convertTo(image, CV_32FC1);
			normalize(image, image,1,0,CV_MINMAX);
			//这是对远地窗口进行写数据操作,这里使用的是MPI的排斥锁 MPI_WIN_LOCK(MPI_LOCK_EXCLUSIVE, , ,)
			MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, window);
			MPI_Put(image.data, image.rows*image.step1(), MPI_FLOAT, 0, 0, image.rows*image.step1(), MPI_FLOAT, window);
			MPI_Win_unlock(0, window);
			
			int dimension[2]{image.rows,image.cols};
			MPI_Send(dimension, 2, MPI_INT32_T, 1, 101, MPI_COMM_WORLD);//这是一个mpi的发送操作
			//101为消息代号,接收时代号与之对应
		}
		if (myid != 0)
		{
			MPI_Status status_t;
			int dimension[2]{ 0,0 };
			//这是一个MPI的接受操作,在接受到数据之前子节点一直在等待
			MPI_Recv(dimension, 2, MPI_INT32_T, 0, 101, MPI_COMM_WORLD, &status_t);
			Mat data(dimension[0], dimension[1], CV_32FC1);
			//接受到了图片的长和宽之后开始进行从原地窗口的读操作,这里使用的是共享锁
			MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, window);
			MPI_Get(data.data, data.rows*data.cols, MPI_FLOAT, 0, 0, data.rows*data.cols,MPI_FLOAT, window);
			MPI_Win_unlock(0, window);
			//执行了一个图片数据矩阵的屏幕输出来验证子节点收到了数据
			for (int i = 0; i < data.rows; i++)
			{
				for (int j = 0; j < data.cols; j++)
				{
					if ((data.data[i*data.cols+ j])>0.1)
					{
						printf("%0.2f ", *((float *)(data.data)+i*data.cols+j));
					}
				}
				printf("\n");
			}		
		}
		//所有节点同步,然后一起结束
		MPI_Barrier(MPI_COMM_WORLD);
		MPI_Win_free(&window);  //释放远地窗口
		MPI_Finalize(); 
		delete[]matWin; //注意matWin中包含的数组,不要重复释放以免出现对动态分配的内存释放多次
		return 0;
	}