管道
1、半双工,即数据只能在一个方向上流动。
2、只能在具有公共祖先的进程间使用。
下图显示父进程关闭读端(fd[0]),子进程关闭写端(fd[1])后的管道流向。
创建一个从父进程到子进程的管道,由父进程向子进程发送数据。
#include "apue.h"
int main(void)
{
int n;
int fd[2];
pid_t pid;
char line[MAXLINE];
/* int pipe(int filedes[2]); filedes[0]: 为读而打开 filedes[1]: 为写而打开 filedes[1]的输出是filedes[0]的输入 如果父进程关闭filedes[0],子进程关闭了filedes[1], 则创建了从父进程到子进程的管道 */
if(pipe(fd) < 0)
err_sys("pipe error");
if((pid = fork()) < 0)
err_sys("fork error");
else if(pid > 0)
{
close(fd[0]);
write(fd[1], "hello world\n", 12);
}
else
{
close(fd[1]);
n = read(fd[0], line, MAXLINE);
write(STDOUT_FILENO, line, n);
}
exit(0);
}
将文件复制到分页程序
功能是每次一页显示已产生的输出。思路:将输出通过管道直接送到分页程序。
先创建一个管道,然后父进程关闭读端,将文件中的内容写入写端。子进程关闭写端,调用dup2,使其标准输入成为管道的读端,然后调用exec,执行用户喜爱的分页程序。当执行分页程序时,标准输入是管道的读端。
#include "apue.h"
#include <sys/wait.h>
#define DEF_PAGER "/bin/more" /* default pager program */
int main(int argc, char *argv[])
{
int n;
int fd[2];
pid_t pid;
char *pager, *argv0;
char line[MAXLINE];
FILE *fp;
if(argc != 2)
err_quit("usage: a.out <pathname>");
if((fp = fopen(argv[1], "r")) == NULL)
err_sys("can't open %s", argv[1]);
if(pipe(fd) < 0)
err_sys("pipe error");
if((pid = fork()) < 0)
err_sys("fork error");
else if(pid > 0)
{
close(fd[0]); /* close read end */
/* parent copies argv[1] to pipe */
while(fgets(line, MAXLINE, fp) != NULL)
{
n = strlen(line);
if(write(fd[1], line, n) != n)
err_sys("write error to pipe");
}
if(ferror(fp))
err_sys("fgets error");
close(fd[1]); /* close write end of pipe for reader */
if(waitpid(pid, NULL, 0) < 0)
err_sys("waitpid error");
exit(0);
}
else
{
close(fd[1]); /* close write end */
if(fd[0] != STDIN_FILENO)
{
if(dup2(fd[0], STDIN_FILENO) != STDIN_FILENO) /* 对输入文件描述符进行重定向 */
err_sys("dup2 error to stdin");
close(fd[0]); /* don't need this after dup2 */
}
/* get arguments for execl() */
if((pager= getenv("PAGER")) == NULL)
pager = DEF_PAGER;
if((argv0 = strchr(pager, '/')) != NULL)
argv0++; /* step past rightmost slash */
else
argv0 = pager; /* no slash in pager */
if(execl(pager, argv0, (char *)0) < 0)
err_sys("execl error for %s", pager);
}
exit(0);
}
利用管道使父子进程同步
在fork之前创建两个管道。父进程在调用TELL_CHILD时,写一个字符”p”至上一个管道,子进程在调用TELL_PARENT时,经由下一个管道写一个字符”c”。相应的WAIT_xxx调用read读这个字符,并发生阻塞。
#include "apue.h"
static int pfd1[2], pfd2[2];
void TELL_WAIT(void)
{
if(pipe(pfd1) < 0 || pipe(pfd2) < 0)
err_sys("pipe error");
}
void TELL_PARENT(pid_t pid)
{
if(write(pfd2[1], "c", 1) != 1)
err_sys("write error");
}
void WAIT_PARENT(void)
{
char c;
if(read(pfd1[0], &c, 1) != 1)
err_sys("read error");
if(c != 'p')
err_quit("WAIT PARENT: incorrect data");
}
void TELL_CHILD(pid_t pid)
{
if(write(pfd1[1], "p", 1) != 1)
err_sys("write error");
}
void WAIT_CHILD(void)
{
char c;
if(read(pfd2[0], &c, 1) != 1)
err_sys("read error");
if(c != 'c')
err_quit("WAIT_CHILD: incorrect data");
}
使用popen向分页程序传送文件
#include "apue.h"
#include <sys/wait.h>
#define PAGER "${PAGER:-more}" /* environment variable, or default */
int main(int argc, char *argv[])
{
char line[MAXLINE];
FILE *fpin, *fpout;
if(argc != 2)
err_quit("usage: a.out <pathname>");
if((fpin = fopen(argv[1], "r")) == NULL)
err_sys("can't open %s", argv[1]);
/*
FILE *popen(const char *cmdstring, const char *type);
创建一个管道,调用fork产生一个子进程,关闭管道的不使用端,调用exec以
运行命令cmdstring,然后等待命令终止。
type:"w",文件指针连接到cmdstring的标准输入,返回的文件指针是可写的。
"r",文件指针连接到cmdstring的标准输出,返回的文件指针是可读的。
*/
if((fpout = popen(PAGER, "w")) == NULL)
err_sys("popen error");
/* copy argv[1] to pager */
while(fgets(line, MAXLINE, fpin) != NULL)
{
if(fputs(line, fpout) == EOF)
err_sys("fputs error to pipe");
}
if(ferror(fpin))
err_sys("fgets error");
/*
int pclose(FILE *fp);
*/
if(pclose(fpout) == -1)
err_sys("pclose error");
exit(0);
}
popen和pclose函数
每次调用popen时,应记住创建进程的进程ID,以及文件描述符或FILE指针。在childpid中保存子进程ID,并用文件描述符作为下标。这样,当以FILE指针作为参数调用pclose时,调用fileno可得到其文件描述符,进而得到子进程ID,并调用waitpid,当发现子进程不存在时,返回-1,errno被设置为ECHILD。
#include "apue.h"
#include <errno.h>
#include <fcntl.h>
#include <sys/wait.h>
/* Pointer to array allocated at run-time */
static pid_t *childpid = NULL;
static int maxfd;
FILE *popen(const char *cmdstring, const char *type)
{
int i;
int pfd[2];
pid_t pid;
FILE *fp;
/* only allow "r" or "w" */
if((type[0] != 'r' && type[0] != 'w') || type[1] != 0)
{
errno = EINVAL; /* required by POSIX */
return (NULL);
}
if(childpid == NULL) /* first time through */
{
/* allocate zeroed out array for child pids */
maxfd = open_max();
if((childpid = calloc(maxfd, sizeof(pid_t))) == NULL)
return (NULL);
}
if(pipe(pfd) < 0)
return (NULL); /* errno set by pipe() */
if((pid = fork()) < 0)
return (NULL);
else if(pid == 0)
{
if(*type == 'r')
{
close(pfd[0]);
if(pfd[1] != STDOUT_FILENO)
{
dup2(pfd[1], STDOUT_FILENO);
close(pfd[1]);
}
}
else
{
close(pfd[1]);
if(pfd[0] != STDIN_FILENO)
{
dup2(pfd[0], STDIN_FILENO);
close(pfd[0]);
}
}
/* 子进程需关闭以前调用popen时打开的且当前仍打开的I/O流 */
for(i=0; i<maxfd; i++)
if(childpid[i] > 0)
close(childpid[i]);
execl("/bin/sh", "sh", "-c", cmdstring, (char *)0);
_exit(127);
}
/* parent continue... */
if(*type == 'r')
{
close(pfd[1]);
if((fp = fdopen(pfd[0], type)) == NULL)
return (NULL);
}
else
{
close(pfd[0]);
if((fp = fdopen(pfd[1], type)) == NULL)
return (NULL);
}
childpid[fileno(fp)] = pid; /* remember child pid for this fd */
return (fp);
}
int pclose(FILE *fp)
{
int fd, stat;
pid_t pid;
if(childpid == NULL)
{
errno = EINVAL;
return (-1); /* popen() has never been called */
}
fd = fileno(fp);
if((pid = childpid[fd]) == 0)
{
errno = EINVAL;
return (-1); /* fp wasn't opened by popen() */
}
childpid[fd] = 0;
if(fclose(fp) == EOF)
return (-1);
/* 若pclose的调用者以及为信号SIGCHLD设置了一个信号处理程序, 则pclose的waitpid返回EINR。因运行调用者捕捉此信号, 所以waitppid 被中断时,只是再次调用waitpid */
while(waitpid(pid, &stat, 0) < 0)
if(errno != EINTR)
return (stat); /* return child's termination status */
}
调用大写/小写过滤程序以读取命令
过滤程序:从标准输入读取数据,对其进行适当处理后写到标准输出。
向标准输出写一个提示,然后从标准输入读1行。使用popen,可在应用程序和输入之间插入一个程序以便对输入进行变换处理。
myuclc.c
#include "apue.h"
#include <ctype.h>
int main(void)
{
int c;
while((c = getchar()) != EOF)
{
if(isupper(c))
c = tolower(c);
if(putchar(c) == EOF)
err_sys("output error");
if(c == '\n')
fflush(stdout);
}
exit(0);
}
popen2.c
#include "apue.h"
#include <sys/wait.h>
int main(void)
{
char line[MAXLINE];
FILE *fpin;
if((fpin = popen("./myuclc", "r")) == NULL)
err_sys("popen error");
for(;;)
{
fputs("prompt> ", stdout);
fflush(stdout);
if(fgets(line, MAXLINE, fpin) == NULL) /* read from pipe */
break;
if(fputs(line, stdout) == EOF)
err_sys("fputs error to pipe");
}
if(pclose(fpin) == -1)
err_sys("pclose error");
putchar('\n');
exit(0);
}
协同进程
协同进程:当一个程序产生某个过滤程序的输入,同时又读取该过滤程序的输出时,则该过滤程序为协同进程。
进程先创建两个管道:一个时协同进程的标准输入,另一个是协同进程的标准输出。
本例中,协同进程从标准输入读两个数,计算和,然后写到标准输出。
如果将协同进程的read和write替换成标准I/O,则不再工作。因为对标准输入的第一个fgets引起标准I/O库分配一个缓冲区,并选择缓冲区类型。因为标准输入是管道,所以缓冲区类型为全缓冲,标准输出同理。当add2在其标准输入读取阻塞时,add2tostdio也在管道读发生阻塞,产生死锁。
add2.c
#include "apue.h"
int main(void)
{
int n, int1, int2;
char line[MAXLINE];
while((n = read(STDIN_FILENO, line, MAXLINE)) > 0)
{
line[n] = 0; /* null terminate */
if(sscanf(line, "%d%d\n", &int1, &int2) == 2)
{
sprintf(line, "%d\n", int1 + int2);
n = strlen(line);
if(write(STDOUT_FILENO, line, n) != n)
err_sys("write error");
}
else
{
if(write(STDOUT_FILENO, "invalid args\n", 13) != 13)
err_sys("write error");
}
}
exit(0);
}
add2tostdio.c
#include "apue.h"
static void sig_pipe(int); /* out signal handler */
int main(void)
{
int n, fd1[2], fd2[2];
pid_t pid;
char line[MAXLINE];
if(signal(SIGPIPE, sig_pipe) == SIG_ERR)
err_sys("signal error");
if(pipe(fd1) < 0 || pipe(fd2) < 0)
err_sys("pipe error");
if((pid = fork()) < 0)
err_sys("fork error");
else if(pid > 0)
{
close(fd1[0]);
close(fd2[1]);
while(fgets(line, MAXLINE, stdin) != NULL)
{
n = strlen(line);
if(write(fd1[1], line, n) != n)
err_sys("write error to pipe");
if((n = read(fd2[0], line, MAXLINE)) < 0)
err_sys("read error from pipe");
if(n == 0)
{
err_msg("child closed pipe");
break;
}
line[n] = 0; /* null terminate */
if(fputs(line, stdout) == EOF)
err_sys("fputs error");
}
if(ferror(stdin))
err_sys("fgets error on stdin");
exit(0);
}
else
{
close(fd1[1]);
close(fd2[0]);
if(fd1[0] != STDIN_FILENO)
{
if(dup2(fd1[0], STDIN_FILENO) != STDIN_FILENO)
err_sys("dup2 error to stdin");
close(fd1[0]);
}
if(fd2[1] != STDOUT_FILENO)
{
if(dup2(fd2[1], STDOUT_FILENO) != STDOUT_FILENO)
err_sys("dup2 error to stdout");
close(fd2[1]);
}
if(execl("./add2", "add2", (char *)0) < 0)
err_sys("execl error");
}
exit(0);
}
static void sig_pipe(int signo)
{
printf("SIGPIPE caught\n");
exit(1);
}
FIFO
FIFO:命名管道。不具有公共祖先的进程也能用其交换数据。
若用write写一个尚无进程为读而打开的FIFO,则产生信号SIGPIPE.
消息队列
消息队列是消息的连接标,存放在内核中并由消息队列标识符标识。
#include <sys/msg.h>
int msgget(key_t key, int flag);
创建一个新队列或打开一个现存的队列。
成功返回非负队列ID,出错返回-1。
flag:权限位
int msgctl(int msquid, int cmd, struct msqid_ds *buf);
对队列执行多种操作。
成功返回0,出错返回-1.
cmd为对队列执行的命令:
IPC_STAT 取队列的msqid_ds结构,存放在buf
IPC_SET 按buf设置msqid_ds结构
IPC_RMID 删除消息队列及队列中的数据
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
将新消息添加到队列尾端。
成功返回0,出错返回-1。
ptr:指向长整型数,包含了正的整型消息类型,其后紧跟消息数据。可用下列结构:
struct mymesg
{
long mtype; positive message type
char mtext[512]; message data, of length nbytes
};
nbytes:实际数据字节数。
flag:可以指定为IPC_NOWAIT。类似于文件I/O的非阻塞I/O标志。
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);
从队列取消息。
成功返回消息的数据部分长度,出错返回-1。
type:0,返回队列的第一个消息;>0,返回消息类型为type的第一个消息;
<0,返回类型小于或等于type绝对值的消息,多个则取类型值最小的。
flag:设置MSG_NOERROR时,若返回消息大于nbytes,消息被截短。可设置为IPC_NOWAIT。
信号量
信号量是一个计数器,用于多进程对共享对象的访问。
#include <sys/sem.h>
int semget(key_t key, int nsems, int flag);
获得一个信号量ID。
成功返回信号量ID,出错返回-1。
nsems:集合中的信号量数。如果创建新集合,则指定nsems。如果引用一个现存的集合,则nsems为0。
int semctl(int semid, int semnum, int cmd, ...);
多种信号量操作。
第4个参数可选,是多个特定命令的联合:
union semun
{
int val; for SETVAL
strut semid_ds *buf; for IPC_STAT and IPC_SET
unsigned short *array; for GETALL and SETALL
};
semnum:信号量集合中的一个成员。
cmd:IPC_STAT、IPC_SET、IPC_RMID、GETVAL、SETVAL、GETPID、GETNCNT、GETZCNT、GETALL、SETALL
int semop(int semid, struct sembuf semoparray[], size_t nops);
自动指向信号量集合上的操作数组,原子操作。
成功返回0,出错返回-1。
setbuf:信号量操作
struct sembuf
{
unsigned short sem_num; member # in set (0, 1, ..., nsems-1)
short sem_op; operation (negative, 0, or positive)
short sem_flag; IPC_NOWAIT, SEM_UNDO
};
sem_op>0:释放占用的资源,sem_op值加到信号量的值上。如果指定了undo,则减去sem_op。
sem_op<0:要获取由该信号量获取的资源。若该信号量值大于sem_op的绝对值,则减去sem_op的
绝对值。如果指定了undo,则加上sem_op的绝对值。
sem_o=0:调用进程希望等待该信号量值变为0.
nops:semoparray中的操作数。
共享存储
运行两个或更多进程共享一给定的存储区。因数据不需要在客户进程和服务器进程之间复制,所以是最快的一种IPC。信号量被用来实现对共享存储访问的同步。
不同
消息队列、信号量、共享存储器:1、创建时需要指定一个键。2、在系统范围内起作用,没有访问计数。如消息队列和内内容会余留在系统中,直至某进程读消息或删除队列消息,或删除队列,或系统再启动时删除队列。3、在文件系统中没有名字。
管道:当最后一个访问管道的进程终止时,管道被完全删除。
FIFO:当最后一个引用FIFO的进程终止时其名字仍保留在系统中,直至显式删除,但FIFO中的数据却已全部被删除。
大量应用程序仍可有效地使用管道和FIFO。在新的应用程序中,要尽可能避免使用消息队列及信号量,而应考虑全双工管道和记录锁,它们使用起来更为简单。共享存储有其应用场合,而mmap也能提供同样的功能。
信号量锁与记录锁比较:
1、记录锁比信号量锁慢。
2、记录锁简单,进程终止时系统会处理任何遗留下的锁。
打印各种不同数据类型所存放的位置
内核将以地址0连接的共享存储段放在什么位置与系统密切相关。
#include "apue.h"
#include <sys/shm.h>
#define ARRAY_SIZE 40000
#define MALLOC_SIZE 100000
#define SHM_SIZE 100000
#define SHM_MODE 0600 /* user read/write */
char array[ARRAY_SIZE]; /* uninitialized data = bss */
int main(void)
{
int shmid;
char *ptr, *shmptr;
printf("array[] from %lx to %lx\n", (unsigned long)&array[0],
(unsigned long)&array[ARRAY_SIZE]);
printf("stack around %lx\n", (unsigned long)&shmid);
if((ptr = malloc(MALLOC_SIZE)) == NULL)
err_sys("malloc error");
printf("malloced from %lx to %lx\n", (unsigned long)ptr,
(unsigned long)ptr+MALLOC_SIZE);
/* int shmget(key_t key, size_t size, int flag); 获得一个共享存储标识符。 IPC_PRIVATE是一个特殊的,用于创建一个新的共享存储段。 成功返回共享存储ID,出错返回-1。 size:共享存储段的长度(字节)。 */
if((shmid = shmget(IPC_PRIVATE, SHM_SIZE, SHM_MODE)) < 0)
err_sys("shmget error");
/* void *shmat(int shmid, const void *addr, int flag); 返回:执行共享存储的指针。 addr为0,则此段连接到内核选择的第一个可用地址,非0,且未指定SHM_RND,则连接到addr指定地址。 addr非0,且指定了SHM_RND,则连接到(addr-(addr mod ulus SHMLBA))指定的地址。一般为0。 flag指定了SHM_RDONLY,则以只读方式连接此段,否则以只写方式连接此段。 */
if((shmptr = shmat(shmid, 0, 0)) == (void *)-1)
err_sys("shmat error");
printf("shared memory attached from %lx to %lx\n",
(unsigned long)shmptr, (unsigned long)shmptr+SHM_SIZE);
/* int shmctl(int shmid, int cmd, struct shmid_ds *buf); 对共享存储段执行多种操作。 成功返回0,出错返回-1。 cmd: IPC_STA 取shmid_ds结构,放在buf IPC_RMID 删除共享存储段 IPC_SET SHM_LOCK SHM_UNLOCK */
if(shmctl(shmid, IPC_RMID, 0) < 0)
err_sys("shmctl error");
exit(0);
}
共享存储段紧靠在栈之下。
在父、子进程间使用/dev/zero存储映射I/O的IPC
将/dev/zero作为IPC,进行存储映射时,具有特殊性质:
1、存储区都初始化为0
2、多个进程的共同祖先进程对mmap指定了MAP_SHARED标准时,则这些进程可共享此存储区。
打开/dev/zero,指定长整型的长度调用mmap。存储映射成功后,关闭此设备。然后创建一个子进程。因为在调用mmap时指定了MAP_SHARED,所以一个进程写到存储区的数据可由另一个进程见到。
然后,父、子进程交替运行,使用同步函数对各自的共享存储映射区中的长整型数加1。存储映射区由
mmap初始为0。父进程先增1,使其成为1,然后子进程增1,使其成为2,然后父进程使其成为3…。
使用/dev/zero的优点,在调用mmap创建映射区前,无需存在一个实际的文件。缺点,只能在相关进程
间起作用。
#include "apue.h"
#include <fcntl.h>
#include <sys/mman.h>
#define NLOOPS 1000
#define SIZE sizeof(long) /* size of shared memory area */
static int update(long *ptr)
{
return ((*ptr)++); /* return value before increment */
}
int main(void)
{
int fd, i, counter;
pid_t pid;
void *area;
if((fd = open("/dev/zero", O_RDWR)) < 0)
err_sys("open error");
if((area = mmap(0, SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED)
err_sys("mmap error");
close(fd); /* can close /dev/zero now that it's mapped */
TELL_WAIT();
if((pid = fork()) < 0)
err_sys("fork error");
else if(pid > 0)
{
for(i=0; i<NLOOPS; i+=2)
{
if((counter = update((long*)area)) != i)
err_quit("parent: expected %d, got %d", i, counter);
TELL_CHILD(pid);
WAIT_CHILD();
}
}
else
{
for(i=1; i<NLOOPS+1; i+=2)
{
WAIT_PARENT();
if((counter = update((long*)area)) != i)
err_quit("child: expected %d, got %d", i, counter);
TELL_PARENT(getppid());
}
}
exit(0);
}