最近干活的时候又被Linux管道和消息队列搞的一脸懵逼。当初自己走马观花似的学习以为内容很简单,结果留下了大坑,借来Unix网络编程来补补,重新审视这两个部分,并且引以为戒!!!
首先看管道
#include<unistd.h> int pipe(int fd[2]);返回:成功为0,出错为1,两个文件描述符fd[0]用来读,fd[1]用来写
灵魂作图
单进程管道
刚fork后
父进程关闭管道读出端,子进程关闭管道写入端,在父子进程间提供一个单向数据流
管道只能用于父子进程或者兄弟进程间通信,也就是说管道只能用于具有亲缘关系的进程间通信
管道的缓冲区大小是受限制的。管道所传输的是无格式的字节流。这就需要管道输入方和输出方事先约定好数据格式
有名管道可用于没有亲缘关系的进程间通信(name pipe或者叫FIFO)
#include<sys/types.h> #include<sys/stat.h> int mkinfo(const char *pathname, mode_t mode);//<span style="font-family: 宋体, Arial; line-height: 26px;"><span style="font-size:12px;">pathname为创建有名管道的全路径名,mode为创建有名管道的模式</span></span>返回:若成功则0,不成功则-1
实现分析
//管道缓冲区个数 #define PIPE_BUFFERS (16) //管道缓存区对象结构 struct pipe_buffer { struct page *page; //管道缓冲区页框的描述符地址 unsigned int offset, len; //页框内有效数据的当前位置,和有效数据的长度 struct pipe_buf_operations *ops; //管道缓存区方法表的地址 };
//管道信息结构 struct pipe_inode_info { wait_queue_head_t wait; //管道等待队列 unsigned int nrbufs, curbuf; //包含待读数据的缓冲区数和包含待读数据的第一个缓冲区的索引 struct pipe_buffer bufs[PIPE_BUFFERS]; //管道缓冲区描述符数组 struct page *tmp_page; //高速缓存区页框指针 unsigned int start; //当前管道缓存区读的位置 unsigned int readers; //读进程的标志,或编号 unsigned int writers; //写进程的标志,或编号 unsigned int waiting_writers; //在等待队列中睡眠的写进程的个数 unsigned int r_counter; //与readers类似,但当等待写入FIFO的进程是使用 unsigned int w_counter; //与writers类似,但当等待写入FIFO的进程时使用 struct fasync_struct *fasync_readers; //用于通过信号进行的异步I/O通知 struct fasync_struct *fasync_writers; //用于通过信号的异步I/O通知 };
//管道读操作函数 static ssize_t pipe_readv(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; //获取inode结点指针 struct pipe_inode_info *info; int do_wakeup; ssize_t ret; struct iovec *iov = (struct iovec *)_iov; //获取读缓冲区的结构 size_t total_len; total_len = iov_length(iov, nr_segs); /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; ret = 0; down(PIPE_SEM(*inode)); //获取inode中的i_sem信号量 info = inode->i_pipe; //获取inode 结构的pipe_inode_info结构指针 for (;;) { int bufs = info->nrbufs; //检查有几个管道缓冲区有被读取的数据 if (bufs) { //说明有其中有缓冲区包含了读数据 int curbuf = info->curbuf; //获取当前读数据的管道缓存区的索引 struct pipe_buffer *buf = info->bufs + curbuf; //共有16个缓冲区,curbuf是当前的 struct pipe_buf_operations *ops = buf->ops; //获取操作函数列表 void *addr; size_t chars = buf->len; int error; //若缓冲区长度大于要求读取的数据长度,chars设置成要求读的长度 if (chars > total_len) chars = total_len; //执行Map方法 addr = ops->map(filp, info, buf); //从缓存区中复制数据 error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars); //执行umap方法 ops->unmap(info, buf); if (unlikely(error)) { if (!ret) ret = -EFAULT; //第一次读失败 break; } //更新管道的offset和len字段 ret += chars; buf->offset += chars; buf->len -= chars; //若现在的缓存区的数据长度为0 if (!buf->len) { buf->ops = NULL; ops->release(info, buf); curbuf = (curbuf + 1) & (PIPE_BUFFERS-1); info->curbuf = curbuf; info->nrbufs = --bufs; do_wakeup = 1; } total_len -= chars; //更新读的总长度 if (!total_len) //该读的已读完成 break; /* common path: read succeeded */ } if (bufs) /* More to do? */ continue; //若bufs为0,说明所有管道为NULL,此时进行一下操作 if (!PIPE_WRITERS(*inode)) //是否有写操作正在进行 break; if (!PIPE_WAITING_WRITERS(*inode)) { //是否需要等待 /* syscall merging: Usually we must not sleep * if O_NONBLOCK is set, or if we got some data. * But if a writer sleeps in kernel space, then * we can wait for that data without violating POSIX. */ if (ret) break; if (filp->f_flags & O_NONBLOCK) { //要等待但又设置了NONBLOCK标记,矛盾了 ret = -EAGAIN; break; } } if (signal_pending(current)) { //设置进程阻塞标志 if (!ret) ret = -ERESTARTSYS; break; } if (do_wakeup) { wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } pipe_wait(inode); } up(PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } if (ret > 0) file_accessed(filp); //更新文件结构的atime对象 return ret; } static ssize_t pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) { struct iovec iov = { .iov_base = buf, .iov_len = count }; return pipe_readv(filp, &iov, 1, ppos); } /* Drop the inode semaphore and wait for a pipe event, atomically */ void pipe_wait(struct inode * inode) { DEFINE_WAIT(wait); //把current添加到管道的等待队列中 prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE); //释放i_sem up(PIPE_SEM(*inode)); schedule(); //被呼醒,把它从等待队列中删除 finish_wait(PIPE_WAIT(*inode), &wait); //再次获取i_sem索引节点信号量 down(PIPE_SEM(*inode)); }
static ssize_t pipe_writev(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; ssize_t ret; int do_wakeup; struct iovec *iov = (struct iovec *)_iov; size_t total_len; total_len = iov_length(iov, nr_segs); /* Null write succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; ret = 0; down(PIPE_SEM(*inode)); info = inode->i_pipe; //是否有读者进程存在,若没有写管道操作就没有任何意义 //此时产生SIGPIPE信号 if (!PIPE_READERS(*inode)) { send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; } /* We try to merge small writes */ //若有待读数据的缓冲区,而且写入的数据长度小于PAGE_SIZE if (info->nrbufs && total_len < PAGE_SIZE) { //第一个待读缓冲区+可读缓冲区数-1得到第一个可写缓冲区的地址 int lastbuf = (info->curbuf + info->nrbufs - 1) & (PIPE_BUFFERS-1); struct pipe_buffer *buf = info->bufs + lastbuf; struct pipe_buf_operations *ops = buf->ops; int offset = buf->offset + buf->len; //若可写缓冲区的剩余的空间大于写入的数据总量total_len if (ops->can_merge && offset + total_len <= PAGE_SIZE) { void *addr = ops->map(filp, info, buf); //把数据复制到管道缓冲区 int error = pipe_iov_copy_from_user(offset + addr, iov, total_len); ops->unmap(info, buf); ret = error; do_wakeup = 1; if (error) goto out; //更新有效数据长度字段 buf->len += total_len; ret = total_len; goto out; } } // 若全部可写(可读缓冲区数为0), // 或写入数据长度大于管道缓冲区的长度单位(PAGE_SIZE) for (;;) { int bufs; //是否有读者进程存在 if (!PIPE_READERS(*inode)) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } //获取读缓冲区数 bufs = info->nrbufs; if (bufs < PIPE_BUFFERS) { ssize_t chars; //用第一个可读缓冲区+可读缓冲区数得到可写(空)缓冲区的地址 int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1); struct pipe_buffer *buf = info->bufs + newbuf; struct page *page = info->tmp_page; int error; //若page的值为空,从伙伴系统中获取一页 if (!page) { page = alloc_page(GFP_HIGHUSER); if (unlikely(!page)) { ret = ret ? : -ENOMEM; break; } info->tmp_page = page; } /* Always wakeup, even if the copy fails. Otherwise * we lock up (O_NONBLOCK-)readers that sleep due to * syscall merging. * FIXME! Is this really true? */ do_wakeup = 1; chars = PAGE_SIZE; if (chars > total_len) chars = total_len; //写chars字节到缓冲区中 error = pipe_iov_copy_from_user(kmap(page), iov, chars); kunmap(page); if (unlikely(error)) { if (!ret) ret = -EFAULT; break; } ret += chars; /* Insert it into the buffer array */ /更新nrbufs,和len字段。 buf->page = page; buf->ops = &anon_pipe_buf_ops; buf->offset = 0; buf->len = chars; info->nrbufs = ++bufs; info->tmp_page = NULL; //若没有写完继续写入剩下的数据 total_len -= chars; if (!total_len) break; } //还有可写缓冲区,继续写 if (bufs < PIPE_BUFFERS) continue; //若设置非阻塞, //若没有写入任何的数据ret=0,此时返回错误 //若已经写完了数据,结束写操作。 if (filp->f_flags & O_NONBLOCK) { if (!ret) ret = -EAGAIN; break; } if (signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } if (do_wakeup) { wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; pipe_wait(inode); PIPE_WAITING_WRITERS(*inode)--; } out: up(PIPE_SEM(*inode)); if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); } if (ret > 0) inode_update_time(inode, 1); /* mtime and ctime */ return ret; }
PS: 管道是作为一组VFS对象来实现的,因此没有对应的磁盘映像。所以管道的安装和实现都是VFS类似,此处不进行探讨