进程间通信的方式中,我们将多个进程共享同一块存储区来进行数据交换的方式称为共享内存通信。源于它直接将“内存”共享的特殊机制,它成为最快的一种IPC通信方式;然而它也不完美,它虽快,但是没有同步机制;通常在一个服务进程对共享存储区还未完成写操作之前,客户进程是不应当去取这些数据的,可没了同步,那可就乱了套了。
这种情况不是我们所愿意看到的,所以基于此 我们常常需要为用到的共享内存段添加上同步的机制,使之“完美”起来。通常呢,实现同步我们很自然地会想到信号量,是的,这里我就是这么干的。用基于信号量的PV操作实现完成一个带同步机制的共享内存段,这样它就可以像一个“fifo”队列一样了,并且它传输效率还会表现得非常不错,虽然它还比较简陋~。
信号量及相关函数
先来说说用到的信号量及处理函数吧。信号量和其它的IPC结构(前面总结过管道、消息队列)不同。它本质就是一个计数器,用于为多个进程提供对共享数据对象的访问。通常进程为了获得共享的资源,需要执行以下操作:
①测试控制该资源的信号量
②若此信号量>0,则进程可以使用该资源。此种情况下,进程会将信号量值减1,表明它使用了一个资源单位。
③否则,此信号量的值为0,则使该进程进入休眠状态,直至信号量值>1。如果有进程正在休眠状态等待此信号量,则唤醒它们
还有就是为了正确的实现信号量,信号量值的测试及减1操作还应当是原子的。为此,信号量通常是在内核中实现的。一般而言,信号量初值可以是任意一个正值,该值表明有多少个共享资源单位可供共享应用。然而遗憾的是,这里我用到的XSI信号量也是有缺陷的。
这源于①信号量并非是单个非负值,它被定义为一个可能含有多个信号量值的集合。通常在创建信号量的时候,对该集合中信号量数量进行指定 ②信号量创建独立于它的初始化。这是最致命的,因为这将导致的是不能原子的创建一个信号量集合,并对该集合中的各个信号量值赋初值。③有的程序1在终止时可能并没有释放掉已经分配给它的信号量。
而对于信号处理函数通常有3个,首先是
1.semget函数
作用:创建一个新的信号量或取得一个已有的信号量
原型:int semget(key_t key, int nsems, int semflg)
参数:
int nsems //它代表信号量集合中有多少个信号量。如果是创建新集合,则必须指定它;如果是引用现有的信号集(通常客户进程中),则将其指定为0.
int semflg //和前面IPC机制类似,用来初始化信号集维护的semid_ds结构中的ipc_perm结构成员sem_perm。通常用IPC_CREAT|0644创建,要直接打开已存在的话 也直接填0就好
2.semctl函数
用途:该函数用来直接控制信号量信息.也就是直接删除信号量或者初始化信号量.
原型:int semctl(int semid, int semnum, int cmd, ...)
参数:
int semid //semget函数返回的信号量标识符.
int semnum, //它代表要给该信号量集中的第几个信号量设置初值
int cmd //通常用SETVAL/GETVAL设置和获取信号量集中的一个单独的信号量。具体还有
IPC_STAT //读取一个信号量集的数据结构semid_ds,并将其存储在semun中的buf参数中。 IPC_SET //设置信号量集的数据结构semid_ds中的元素ipc_perm,其值取自semun中的buf参数。 IPC_RMID //将信号量集从内存中删除。 GETALL //用于读取信号量集中的所有信号量的值。 GETNCNT //返回正在等待资源的进程数目。 GETPID //返回最后一个执行semop操作的进程的PID。 GETVAL //返回信号量集中的一个单个的信号量的值。 GETZCNT //返回这在等待完全空闲的资源的进程数目。 SETALL //设置信号量集中的所有的信号量的值。 SETVAL //设置信号量集中的一个单独的信号量的值
如果有第四个参数,取决于所请求的命令,如果使用该参数,它通常是一个union semum结构,定义如下:
union semun { int val; /* Value for SETVAL */通常就要它就够了 struct semid_ds *buf; unsigned short *arry; };
赋值形式:semun.val = 2 //初值放在val中
执行PV操作
3.semop函数
用途:用来改变信号量的值,该函数是具有原子性的。
原型:int semop(int semid, struct sembuf *sops, size_t nsops)
struct sembuf
{
unsigned short sem_num; /* semaphore number */除非使用一组信号量,否则它为0
short sem_op; /* semaphore operation */ p -1, v 1
short sem_flg; /* operation flags */ 填 0就好 SEM_NOWAIT SEM_UNDO
}
注意这当中的sem_op参数,信号量集合中的各个成员的操作都应由相应的sem_op值规定。此值可正可负可为0,相应的值就代表对于进程中占用的资源数量,
同时这值会加到信号量值上,若指定undo标志,还从信号量调整值上减去sem_op.
共享内存增添同步机制
下面就可以开始操作一段共享内存,使其带有同步的机制,然后模拟重现我们操作系统书上的那个经典的生产消费者问题,并且解决它。这里我画个图帮助整理思路:
首先,定义出一个管理内存段的“shmfifo”结构,该结构中具体可用一个shm_head结构来管理数据的读、写位置和大小的信息。同时,shimfifo结构中还维护了3个用于解决互斥和同步的信号量sem_mutex,sem_empty, sem_full。
维护的shm_head结构存放在共享内存的头部,写入数据从payload处开始写入一个数据块大小,每次写入之后,便更新头部的wr_idx位,payload可由_head+1得到;同样,最开始读出数据时也是从payload处开始读,每次读完便更新wr_idx。好,到这里就有了大致的思路。于是可以实现出来它的头文件
#ifndef __SHMFIFO_H__ #define __SHMFIFO_H__ #include <sys/ipc.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/shm.h> typedef struct shmhead { int rd_idx; // 读入数据索引 int wr_idx; // 写数据索引 int blocks; // 存数据块数量 int blksz; // 每个数据块大小 }shmhead_t; typedef struct shmfifo { shmhead_t *p_head; // 共享内存的起始地址 char * p_payload; // 有效数据的起始地址 int shmid; // 打开的共享内存id int sem_mutex; // 互斥量 int sem_empty; // 还剩多少个可以消费
int sem_full; // 剩余多少个地方可以生产 }shmfifo_t; // 初始化函数 shmfifo_t *shmfifo_init(key_t key, int blocks, int blksz); // 放入数据 void shmfifo_put(shmfifo_t *fifo, const void *buf); // 取得数据 void shmfifo_get(shmfifo_t *fifo, void *buf); // 结构销毁 void shmfifo_destroy(shmfifo_t *fifo); #endif //__SHMFIFO_H__
紧接着要考虑就该结构的初始化。首先,肯定是先要为结构开出空间来,其大小不难分析应该为shm_head大小加上blocks*blksz,其次就是一步步对这些变量和信号进行初始化了。
紧接着,对于放数据和取数据就依葫芦画瓢就好了,值得注意就是对信号量的处理,进行PV操作时,针对写数据时,我们需要先P(sem_empty)保证先有地方可以放数据,其次才进行P(sem_mutex)保证互斥性。(否则,会因为在放入数据时进行了P(sem_mutex)操作,在还没来得及读数据时,就将内存段放满,进而使取数据操作阻塞在信号量sem_mutex<0条件上,最终导致死锁。但这里若能保证在内存段还未放满时,读数据进程能得到调度,那么就不会有这样的问题了;比如,这里可以让写数据sleep一会,然后执行读数据操作或者 先进行读数据,然后再写数据;你可以去试试看~ )
然后放数据完成,便可进行V(sem_full)接着V(sem_mutex)。在进行取数据操作时同理。基于此,便可有以下代码:
#include "shmfifo.h" typedef union semun{ int val; }semun; // 初始化 shmfifo_t* shmfifo_init(key_t key, int blocks, int blksz) { shmfifo_t *p = malloc(sizeof(shmfifo_t)); int shmid = shmget(key, 0, 0); int len = sizeof(shmhead_t) + blocks*blksz; //共享内存段大小 if(shmid == -1 ) // 内存段不存在,创建 { shmid = shmget(key, len, IPC_CREAT|0644); if ( shmid == -1) perror("shmget"),exit(1); //初始化内存段头 p->p_head = shmat(shmid, NULL, 0); //将开出的内存段挂载到进程地址空间 p->p_head->rd_idx = 0; p->p_head->wr_idx = 0; p->p_head->blocks = blocks; p->p_head->blksz = blksz; //初始化后段 p->p_payload = (char*)(p->p_head+1); p->shmid = shmid; p->sem_mutex = semget(key, 1, IPC_CREAT|0644); p->sem_empty = semget(key+1, 1, IPC_CREAT|0644); p->sem_full = semget(key+2, 1, IPC_CREAT|0644); semun su = {1}; //设置互斥信号量初值为1 semctl(p->sem_mutex, 0, SETVAL, su); su.val = blocks; semctl(p->sem_empty, 0, SETVAL, su); su.val = 0; //初始不能消费 semctl(p->sem_full, 0, SETVAL, su); } else //内存段存在 ,打开 { p->p_head = shmat(shmid, NULL, 0); p->p_payload = (char*)(p->p_head+1); p->shmid = shmid; p->sem_mutex = semget(key, 0, 0); // p->sem_empty = semget(key+1, 0, 0); p->sem_full = semget(key+2, 0, 0); } return p; } static void P(int id) { struct sembuf sb[1] = {0,-1, 0}; semop(id, sb, 1); } static void V(int id) { struct sembuf sb[1] = {0, 1, 0}; semop(id, sb, 1); } // 放入数据 void shmfifo_put(shmfifo_t *fifo, const void *buf) { P(fifo->sem_empty); //有多少地方可供生产,确保有空位生产 P(fifo->sem_mutex); //保证进程互斥 memcpy(fifo->p_payload + fifo->p_head->wr_idx * fifo->p_head->blksz, //写入位置 buf, fifo->p_head->blksz); //每次写入一个数据块大小 fifo->p_head->wr_idx = (fifo->p_head->wr_idx+1) %fifo->p_head->blocks; //取模,保证数据存满时,转从payload处写数据 V(fifo->sem_full); V(fifo->sem_mutex); } // 取得数据 void shmfifo_get(shmfifo_t* pFifo, void *buf) { P(pFifo->sem_full); //确保有数据可取 P(pFifo->sem_mutex); //从内存段读取,拷入buf中 memcpy(buf, pFifo->p_payload + pFifo->p_head->rd_idx* pFifo->p_head->blksz, pFifo->p_head->blksz); pFifo->p_head->rd_idx = (pFifo->p_head->rd_idx+1) %pFifo->p_head->blocks; //取模,保证数据存满时,转从payload处取数据 V(pFifo->sem_empty); V(pFifo->sem_mutex); } // 销毁 void shmfifo_destroy(shmfifo_t* pFifo) { shmdt(pFifo->p_head); //取消内存段挂载 shmctl(pFifo->shmid, IPC_RMID, 0); //释放掉该内存段 //删除信号量 semctl(pFifo->sem_mutex, 0, IPC_RMID, 0); semctl(pFifo->sem_empty, 0, IPC_RMID, 0); semctl(pFifo->sem_full, 0, IPC_RMID, 0); free(pFifo); }
最后就是分别实现get.c和put.c进行验证,
get.c
#include "shmfifo.h" #include <unistd.h> typedef struct Products{ int id; char pro_name[10]; }Pro; int main() { shmfifo_t* fifo = shmfifo_init(12345, 3, sizeof(Pro)); Pro p; while( 1){ memset(&p, 0x00, sizeof(p)); shmfifo_get(fifo, &p); printf("id:%d, 产品名:%s\n", p.id, p.pro_name); sleep(1); } shmfifo_destroy(fifo); }
put.c
#include "shmfifo.h" typedef struct Product { int id; char pro_name[10]; }Pro; int main() { shmfifo_t *fifo = shmfifo_init(12345, 4, sizeof(Pro)); Pro p; for (int i=0; i<20; ++i) { memset(&p, 0x00, sizeof(p)); sprintf(p.pro_name, "iphone%d", i); p.id = i+1; shmfifo_put(fifo, &p); printf("put %d ok\n", i); } }
验证同步,当写进程结束,读数据进程便阻塞
验证互斥,进程1写第9个数据时,我另开启了一进程写数据,从右侧可见这两进程是交替进行写操作的