标准I/O VS 网络IO
- 标准I/O又称为标准I/O流,从某种意义上讲是全双工的,因为程序能够在同一个流上执行输入和输出。
- Unix/Linux对网络的抽象是一种称为套接字的文件类型。和任何Unix/Linux文件一样,套接字也是用文件描述符来引用的,在这种情况下称为套接字描述符。引用进程通过读写套接字描述符来与运行在其他计算机上的进程通信。
然而对流的限制和对套接字的限制,有时候会相互冲突。(However, there are restrictions on full-duplex streams that interact badly with restrictions on sockets:):
-
限制一:跟在输出函数之后的输入函数。 如果中间没有插入对
fflush
,fseek
,fsetpos
或者rewind
的调用,一个输入函数不能跟随在一个输出函数之后。fflush
函数清空与流相关的缓冲区。后三个函数使用 Unix I/O lseek 函数来重置当前的文件位置。 -
限制二:跟在输入函数之后的输出函数。 如果中间没有插入对
fseek
,fsetpos
或者rewind
的调用,一个输出函数不能跟随在一个输入函数之后, 除非该输入函数遇到了一个EOF。
这些限制给网络应用带来了一个问题, 因为对套接字使用
lseek
函数是非法的。 对流 I/O 的第一个限制能够通过采用在每个输入操作前刷新缓冲区这样的规则来满足。然而,要满足第二个限制的唯一办法是,对同一个打开的套接字描述符打开两个流,一个用来读,一个用来写:
FILE *fpin, *fput;
fpin = fdopen(sockfd, "r");
fput = fdopen(sockfd, "w");
但是这种方法也有问题,因为它要求应用程序在两个流上都要调用fclose
, 这样才能释放与每个流相关联的存储器资源,避免存储器泄漏:
fclose(fpin);
fclose(fpout);
这些操作中的每一个都试图关闭同一个底层的套接字描述符,所以第二个close操作会失败。对顺序的程序来说,这并不是问题,但是在一个线程化的程序中关闭一个已经关闭了的描述符是会导致灾难的。
Linux网络IO的实现
头文件: rio.h
#ifndef __RIO_H__
#define __RIO_H__
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <arpa/inet.h>
// Simplifies calls to bind(), connect(), and accept()
typedef struct sockaddr SA;
// Persistent state for the robust I/O (Rio) package
#define RIO_BUFSIZE 8192
typedef struct {
int rio_fd; // Descriptor for this internal buf
int rio_cnt; // Unread bytes in internal buf
char *rio_bufptr; // Next unread byte in internal buf
char rio_buf[RIO_BUFSIZE]; // Internal buffer
} rio_t;
// External variables
extern int h_errno; // Defined by BIND for DNS errors
extern char **environ; // Defined by libc
// Misc constants
#define MAXLINE 8192 // Max text line length
#define MAXBUF 8192 // Max I/O buffer size
#define LISTENQ 1024 // Second argument to listen()
// Our own error-handling functions
void unix_error(char *msg);
void posix_error(int code, char *msg);
void dns_error(char *msg);
void app_error(char *msg);
// Rio (Robust I/O) package
ssize_t rio_readn(int fd, void *usrbuf, size_t n);
ssize_t rio_writen(int fd, void *usrbuf, size_t n);
void rio_readinitb(rio_t *rp, int fd);
ssize_t rio_readnb(rio_t *rp, void *usrbuf, size_t n);
ssize_t rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen);
// Wrappers for Rio package
ssize_t Rio_readn(int fd, void *usrbuf, size_t n);
void Rio_writen(int fd, void *usrbuf, size_t n);
void Rio_readinitb(rio_t *rp, int fd);
ssize_t Rio_readnb(rio_t *rp, void *usrbuf, size_t n);
ssize_t Rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen);
// Client/server helper functions
int open_clientfd(char *hostname, int portno);
int open_listenfd(int portno);
// Wrappers for client/server helper functions
int Open_clientfd(char *hostname, int port);
int Open_listenfd(int port);
#endif // __RIO_H__
网络IO函数及其包裹函数:
#include "rio.h"
/*********************************************************************
* The Rio package - robust I/O functions
**********************************************************************/
/*
* rio_readn - robustly read n bytes (unbuffered)
*/
// rio_readn
ssize_t rio_readn(int fd, void *usrbuf, size_t n)
{
size_t nleft = n;
ssize_t nread;
char *bufp = usrbuf;
while (nleft > 0) {
if ((nread = read(fd, bufp, nleft)) < 0) {
if (errno == EINTR) // Interrupted by sig handler return
nread = 0; // and call read() again
else
return -1; // errno set by read()
}
else if (nread == 0)
break; // EOF
nleft -= nread;
bufp += nread;
}
return (n - nleft); // return >= 0
}
/*
* rio_writen - robustly write n bytes (unbuffered)
*/
// rio_writen
ssize_t rio_writen(int fd, void *usrbuf, size_t n)
{
size_t nleft = n;
ssize_t nwritten;
char *bufp = usrbuf;
while (nleft > 0) {
if ((nwritten = write(fd, bufp, nleft)) <= 0) {
if (errno == EINTR) // Interrupted by sig handler return
nwritten = 0; // and call write() again
else
return -1; // errno set by write()
}
nleft -= nwritten;
bufp += nwritten;
}
return n;
}
/*
* rio_read - This is a wrapper for the Unix read() function that
* transfers min(n, rio_cnt) bytes from an internal buffer to a user
* buffer, where n is the number of bytes requested by the user and
* rio_cnt is the number of unread bytes in the internal buffer. On
* entry, rio_read() refills the internal buffer via a call to
* read() if the internal buffer is empty.
*/
/* $begin rio_read */
static ssize_t rio_read(rio_t *rp, char *usrbuf, size_t n)
{
int cnt;
while (rp->rio_cnt <= 0) { /* Refill if buf is empty */
rp->rio_cnt = read(rp->rio_fd, rp->rio_buf,
sizeof(rp->rio_buf));
if (rp->rio_cnt < 0) {
if (errno != EINTR) /* Interrupted by sig handler return */
return -1;
}
else if (rp->rio_cnt == 0) /* EOF */
return 0;
else
rp->rio_bufptr = rp->rio_buf; /* Reset buffer ptr */
}
/* Copy min(n, rp->rio_cnt) bytes from internal buf to user buf */
cnt = n;
if (rp->rio_cnt < n)
cnt = rp->rio_cnt;
memcpy(usrbuf, rp->rio_bufptr, cnt);
rp->rio_bufptr += cnt;
rp->rio_cnt -= cnt;
return cnt;
}
/*
* rio_readinitb - Associate a descriptor with a read buffer and reset buffer
*/
/* $begin rio_readinitb */
void rio_readinitb(rio_t *rp, int fd)
{
rp->rio_fd = fd;
rp->rio_cnt = 0;
rp->rio_bufptr = rp->rio_buf;
}
/*
* rio_readnb - Robustly read n bytes (buffered)
*/
/* $begin rio_readnb */
ssize_t rio_readnb(rio_t *rp, void *usrbuf, size_t n)
{
size_t nleft = n;
ssize_t nread;
char *bufp = usrbuf;
while (nleft > 0) {
if ((nread = rio_read(rp, bufp, nleft)) < 0)
return -1; /* errno set by read() */
else if (nread == 0)
break; /* EOF */
nleft -= nread;
bufp += nread;
}
return (n - nleft); /* return >= 0 */
}
/*
* rio_readlineb - robustly read a text line (buffered)
*/
/* $begin rio_readlineb */
ssize_t rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen)
{
int n, rc;
char c, *bufp = usrbuf;
for (n = 1; n < maxlen; n++) {
if ((rc = rio_read(rp, &c, 1)) == 1) {
*bufp++ = c;
if (c == '\n') {
n++;
break;
}
} else if (rc == 0) {
if (n == 1)
return 0; /* EOF, no data read */
else
break; /* EOF, some data was read */
} else
return -1; /* Error */
}
*bufp = 0;
return n-1;
}
/**********************************
* Wrappers for robust I/O routines
**********************************/
ssize_t Rio_readn(int fd, void *ptr, size_t nbytes)
{
ssize_t n;
if ((n = rio_readn(fd, ptr, nbytes)) < 0)
unix_error("Rio_readn error");
return n;
}
void Rio_writen(int fd, void *usrbuf, size_t n)
{
if (rio_writen(fd, usrbuf, n) != n)
unix_error("Rio_writen error");
}
void Rio_readinitb(rio_t *rp, int fd)
{
rio_readinitb(rp, fd);
}
ssize_t Rio_readnb(rio_t *rp, void *usrbuf, size_t n)
{
ssize_t rc;
if ((rc = rio_readnb(rp, usrbuf, n)) < 0)
unix_error("Rio_readnb error");
return rc;
}
ssize_t Rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen)
{
ssize_t rc;
if ((rc = rio_readlineb(rp, usrbuf, maxlen)) < 0)
unix_error("Rio_readlineb error");
return rc;
}
TCP 连接函数包装
#include "rio.h"
/********************************
* Client/server helper functions
********************************/
/*
* open_clientfd - open connection to server at <hostname, port>
* and return a socket descriptor ready for reading and writing.
* Returns -1 and sets errno on Unix error.
* Returns -2 and sets h_errno on DNS (gethostbyname) error.
*/
/* $begin open_clientfd */
int open_clientfd(char *hostname, int port)
{
int clientfd;
struct hostent *hp;
struct sockaddr_in serveraddr;
if ((clientfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
return -1; /* Check errno for cause of error */
/* Fill in the server's IP address and port */
if ((hp = gethostbyname(hostname)) == NULL)
return -2; /* Check h_errno for cause of error */
bzero((char *) &serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
bcopy((char *)hp->h_addr_list[0],
(char *)&serveraddr.sin_addr.s_addr, hp->h_length);
serveraddr.sin_port = htons(port);
/* Establish a connection with the server */
if (connect(clientfd, (SA *) &serveraddr, sizeof(serveraddr)) < 0)
return -1;
return clientfd;
}
/*
* open_listenfd - open and return a listening socket on port
* Returns -1 and sets errno on Unix error.
*/
/* $begin open_listenfd */
int open_listenfd(int port)
{
int listenfd, optval=1;
struct sockaddr_in serveraddr;
/* Create a socket descriptor */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
return -1;
/* Eliminates "Address already in use" error from bind */
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,
(const void *)&optval , sizeof(int)) < 0)
return -1;
/* Listenfd will be an endpoint for all requests to port
on any IP address for this host */
bzero((char *) &serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons((unsigned short)port);
if (bind(listenfd, (SA *)&serveraddr, sizeof(serveraddr)) < 0)
return -1;
/* Make it a listening socket ready to accept connection requests */
if (listen(listenfd, LISTENQ) < 0)
return -1;
return listenfd;
}
/******************************************
* Wrappers for the client/server helper routines
******************************************/
int Open_clientfd(char *hostname, int port)
{
int rc;
if ((rc = open_clientfd(hostname, port)) < 0) {
if (rc == -1)
unix_error("Open_clientfd Unix error");
else
dns_error("Open_clientfd DNS error");
}
return rc;
}
int Open_listenfd(int port)
{
int rc;
if ((rc = open_listenfd(port)) < 0)
unix_error("Open_listenfd error");
return rc;
}
错误处理函数 -- 包装
#include "rio.h"
/**************************
* Error-handling functions
**************************/
// Unix-style error
void unix_error(char *msg)
{
fprintf(stderr, "%s: %s\n", msg, strerror(errno));
exit(0);
}
// Posix-style error
void posix_error(int code, char *msg)
{
fprintf(stderr, "%s: %s\n", msg, strerror(code));
exit(0);
}
// DNS-style error
void dns_error(char *msg)
{
fprintf(stderr, "%s: DNS error %d\n", msg, h_errno);
exit(0);
}
// Application error
void app_error(char *msg)
{
fprintf(stderr, "%s\n", msg);
exit(0);
}
### 参考资料
- 《深入理解计算机系统 原书2th》(CSAPP)