1、高性能server
对于高性能server对于。处理速度和占用空间小是典型特性。特别是当server经验C10K问题的时候(网络server在处理数以万计的client连接时。往往出现效率低下甚至全然瘫痪,这被称为C10K问题)。
要做到处理速度足够快。其并发模型的设计相当关键,而要做到资源尤其是内存资源的占用少,就要依赖于其资源分配和资源管理的方案设计。
server的并发模型设计是网络编程中非常关键的一个部分,server的并发量取决于两个因素,一个是提供服务的进程数量,另外一个是每一个进程可同一时候处理的并发连接数量。对应的。server的并发模型也由两个部分构成:进程模型和连接处理机制。进程模型主要有下面3种模型:
(1)单进程模式:这样的模式的server称为迭代server,实现最简单。也没有进程控制的开销。cpu利用率最高,可是全部的客户连接请求排队等待处理,假设有一条连接时长过大,则其它请求就会被堵塞甚至被丢弃,这样的模型也非常easy被攻击,一般非常少使用这样的模型。
(2)多进程并发模式:这样的模式由master进程启动监听并接受连接。然后为每一个客户连接请求fork一个worker子进程处理客户请求。这样的模式也比較简单,可是为每一个客户连接请求fork一个子进程比較耗费cpu时间。并且子进程过多的情况下可能会用尽内存。导致開始对换。总体性能急降。这样的模型在小并发的情况下比較经常使用,比方每天处理几千个客户请求的情况;
(3)prefork模式:master进程监听client连接请求并持续监视可用子进程数量,低于阀值则fork额外的子进程,高于阀值则kill掉一些过剩的子进程。
这样的模式依据accept的详细情形又能够分为三种变体:
master 负责listen,每一个worker子进程独自accept,accept无上锁。全部worker堵塞于同一个监听套接字上睡眠,当有新的客户连接请求时,内核会唤醒全部等待该事件的睡眠worker子进程。最先运行的worker将获得连接套接字并处理请求。这样的模型会导致惊群问题,虽然仅仅有一个子进程将获得连接,可是全部子进程都会被唤醒,子进程越多,惊群问题对于性能的影响就越大。
还有一方面。假设每一个worker不是堵塞于accept而是堵塞于select,则非常easy造成select冲突问题,这样的情况的性能损耗更大。所以这样的模型一般都是直接堵塞于accept,不堵塞于select。
master 负责listen。每一个worker子进程独自accpet。accept有上锁。这样的模型攻克了惊群问题,仅仅有一个worker堵塞于accpet,其余worker都堵塞于获取锁资源,上锁能够使用文件上锁或者使用共享内存的相互排斥锁,这样的模型的cpu耗时略高于第一种模型。这两种模型都是由内核负责把client连接请求交由某个worker,客户连接请求的处理比較均匀。由于内核使用了公平的进程切换方式。
master负责listen和accpet。通过某种方式把获得的连接套接字交给一个空暇worker。这样的模型下的master必须管理全部worker子进程的状态,而且要使用某种方式的进程间通信方式传递套接字给子进程,比方採用socketpair创建字节流管道用于传递。
相对于上面两种模型而言。这样的模型复杂度高一些,cpu耗时也更高,而且子进程的分配也由master负责。是否均匀取决于master。
以上的进程模型都假定了两个条件,即套接字是堵塞的。而且每一个客户连接请求相应一个子进程。这也就意味着假设同一时候并发量非常高的时候。比方超过1万的并发量。就要有1万个worker子进程同一时候服务。内存耗光后,server性能急剧下降。
这些模型基本上仅仅能服务于并发量非常低的情况,一般在1千以内勉强过得去(还依赖于每一个处理的消耗)。
一个自然的解决的方法就是把进程与连接的比例从1:1变成m:n。m=1、n>1的情况下,一个worker进程能够处理多个连接请求。这样对于每一个client连接的处理就不能是全程堵塞的了。能够把每一个client连接的处理分为若干过程,每一个过程都是一个状态,这样就能够把对一个客户的连接请求处理分解成若干步骤。
假设把每一个客户请求的处理分开为不同的阶段。就能够在一个子进程内或者一批子进程间并发的处理很多其它的连接请求了,而且能够更好的控制资源的分配和管理,将资源的消耗降到一定的低水平,这样也就等于提高了server的总体并发能力。
以下介绍一下并发模型的连接处理机制。这个机制的关键是IO模型。一般有五种典型的IO模型:
(1)堵塞IO模型:当套接口是堵塞的,全部的输入操作(调用connect, accept, read, recvfrom, recv, recvmsg等输入函数)发起后会堵塞到两个步骤完毕才会返回;
(2)非堵塞IO模型:当套接口是非堵塞的,全部的输入操作在第一个步骤马上返回。这个时候一般须要轮询检查(循环调用输入函数),当数据准备好或者连接已经建立进入第二步的情况下,调用的输入函数将堵塞到第二步完毕为止;
(3)IO复用模型:当在等待多个套接口的输入时,能够调用select、poll等IO复用函数监听这些套接口的输入事件。进程会堵塞在这些调用上。直到有一个或者多个套接口的输入事件发生。也即完毕了第一步,IO复用函数会返回这些套接口,接着进程能够调用输入函数完毕这些套接口的第二步;
(4)信号驱动IO模型:创建套接口的时候,开启套接口的信号驱动IO功能,并安装一个信号处理函数。处理SIGIO信号。当套接口完毕了第一步时,会发送SIGIO信号通知进程处理,进程在信号处理函数中完毕第二步;
(5)异步IO模型:告诉内核启动某个输入操作,并让内核在完毕了输入操作之后(两个步骤都完毕)通知进程。
前3种模型在全部的操作系统都支持,而后两种模型非常少操作系统实现,前4种IO模型都会导致进程堵塞,直到IO操作完毕,属于同步IO操作,仅仅有异步IO模型不导致进程堵塞,是异步IO操作。
IO 复用模型中,select和poll一般全部的操作系统都会支持,可是每次等待都要设置须要等待的套接口,而且内部的实现不够高效。非常难支持监听高并发量的套接口集。不同的操作系统使用了不同的高级轮询技术来支持高性能的监听。一般这些方式都不是可移植的。比方freebsd上实现了
kqueue,solaris实现了/dev/poll。linux实现了epoll等等。
nginx针对不同的操作系统,定制了不同的IO处理机制。一般都会採用操作系统的高性能接口。
2、基本思想
为了追求高并发和高速响应,并发连接是不论什么服务端程序都逃不掉的重要性能指标,怎样处理大量并发连接无疑是server端程序设计时所要考虑的第一问题。
nginx採用的是大部分HTTPserver的做法,即master-worker模型。一个master进程管理一个或者多个worker进程,主要的事件处理都是放在worker进程,master负责一些全局初始化。以及对worker进程的管理。
在nginx中。master进程和worker进程的通信主要是通过socketpair来实现的。每当fork完一个子进程之后,就将这个子进程的socketpair句柄传递给前面已经存在的子进程。这样子进程之间也就能够通信了。
Nginx中fork子进程的函数是ngx_spawn_process()。
主要实现代码在ngx_process.h和ngx_process.c文件里。
3、数据结构
ngx_process_t
typedef struct {
ngx_pid_t pid; //进程id
int status; //进程的退出状态(主要在waitpid中进行处理)
ngx_socket_t channel[2]; //socketpair创建的一对socket句柄
ngx_spawn_proc_pt proc; //进程的运行函数
void *data; //proc的參数
char *name;
unsigned respawn:1; //进程的状态,又一次创建
unsigned just_respawn:1; //进程的状态。第一次创建
unsigned detached:1; //进程的状态。分离
unsigned exiting:1; //进程的状态。正在退出
unsigned exited:1; //进程的状态,已经退出
} ngx_process_t;
ngx_channel_t
typedef struct {
ngx_uint_t command; //要发送的命令
ngx_pid_t pid; //发送方的进程id
ngx_int_t slot; //发送方进程在进程表中的偏移位置
ngx_fd_t fd; //发送给对方的句柄
} ngx_channel_t;
ngx_cycle_s
struct ngx_cycle_s {
void ****conf_ctx;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_log_t new_log;
ngx_connection_t **files;
ngx_connection_t *free_connections;
ngx_uint_t free_connection_n;
ngx_queue_t reusable_connections_queue;
ngx_array_t listening;
ngx_array_t pathes;
ngx_list_t open_files;
ngx_list_t shared_memory;
ngx_uint_t connection_n;
ngx_uint_t files_n;
ngx_connection_t *connections;
ngx_event_t *read_events;
ngx_event_t *write_events;
ngx_cycle_t *old_cycle;
ngx_str_t conf_file;
ngx_str_t conf_param;
ngx_str_t conf_prefix;
ngx_str_t prefix;
ngx_str_t lock_file;
ngx_str_t hostname;
};
备注:在nginx中,一个cycle代表一个进程,全部进程相关变量(包含连接)都在这个结构体中。
ngx_listening_s
struct ngx_listening_s {
ngx_socket_t fd; //监听套接字的套接字描写叙述符
struct sockaddr *sockaddr; //监听套接口地址结构
socklen_t socklen;
size_t addr_text_max_len;
ngx_str_t addr_text;
int type; //SOCK_STREAM
int backlog;
int rcvbuf; //
监听套接口的接收缓冲区长度
int sndbuf; //监听套接口的发送缓冲区长度
ngx_connection_handler_pt handler;
void *servers;
ngx_log_t log;
ngx_log_t *logp;
size_t pool_size;
size_t post_accept_buffer_size;
ngx_msec_t post_accept_timeout;
ngx_listening_t *previous;
ngx_connection_t *connection; //监听也是一个连接。要分配给监听一个连接资源
unsigned open:1;
unsigned remain:1;
unsigned ignore:1;
unsigned bound:1;
unsigned inherited:1;
unsigned nonblocking_accept:1;
unsigned listen:1;
unsigned nonblocking:1;
unsigned shared:1;
unsigned addr_ntop:1;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
unsigned ipv6only:2;
#endif
#if (NGX_HAVE_DEFERRED_ACCEPT)
unsigned deferred_accept:1;
unsigned delete_deferred:1;
unsigned add_deferred:1;
#ifdef SO_ACCEPTFILTER
char *accept_filter;
#endif
#endif
#if (NGX_HAVE_SETFIB)
int setfib;
#endif
};
ngx_connection_s
struct ngx_connection_s {
void *data;
ngx_event_t *read; //读事件
ngx_event_t *write; //写事件
ngx_socket_t fd; //连接套接口的套接口描写叙述字
ngx_recv_pt recv;
ngx_send_pt send;
ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain;
ngx_listening_t *listening; //该连接相应的监听
off_t sent;
ngx_log_t *log;
ngx_pool_t *pool;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
#if (NGX_SSL)
ngx_ssl_connection_t *ssl;
#endif
struct sockaddr *local_sockaddr;
ngx_buf_t *buffer;
ngx_queue_t queue;
ngx_atomic_uint_t number;
ngx_uint_t requests;
unsigned buffered:8;
unsigned log_error:3; /* ngx_connection_log_error_e */
unsigned single_connection:1;
unsigned unexpected_eof:1;
unsigned timedout:1;
unsigned error:1;
unsigned destroyed:1;
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned sendfile:1;
unsigned sndlowat:1;
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
#if (NGX_HAVE_IOCP)
unsigned accept_context_updated:1;
#endif
#if (NGX_HAVE_AIO_SENDFILE)
unsigned aio_sendfile:1;
ngx_buf_t *busy_sendfile;
#endif
#if (NGX_THREADS)
ngx_atomic_t lock;
#endif
};
4、详细实现
nginx的进程启动过程是在ngx_master_process_cycle()函数中实现的。单进程是通过ngx_single_process_cycle()函数中完毕,在多进程模型中,会依据配置文件的worker_processes值创建多个子进程,即一个master和多个worker子进程。
进程之间、进程与外部之间保持通信。进程之间是通过socketpair进行通信的,进程与外部之间是通过信号通信的。
master进程主要进行一些全局性的初始化工作和管理worker子进程的工作。事件处理是在worker子进程中进行的。
进程启动过程中。有些全局数据会被设置。最重要的是进程表ngx_processes。master进程没创建一个worker子进程,都会把一个设置好的ngx_process_t结构变量放入ngx_processes中。进程表长度是1024。
ngx_open_listening_sockets(cycle)
主要功能:读取配置文件。绑定、监听服务port。
ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
。
。。
for (tries = 5; tries; tries--) {
failed = 0;
/* for each listening socket */
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0); //socket
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int)) //setsockopt
== -1)
{
}
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
if (ls[i].sockaddr->sa_family == AF_INET6 && ls[i].ipv6only) {
}
#endif
if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) { //bind
}
#if (NGX_HAVE_UNIX_DOMAIN)
if (ls[i].sockaddr->sa_family == AF_UNIX) {
}
#endif
if (listen(s, ls[i].backlog) == -1) { //listen
}
ls[i].listen = 1;
ls[i].fd = s;
}//for cycle
if (!failed) {
break;
}
ngx_msleep(500);
} //for tries=5
if (failed) {
ngx_log_error(NGX_LOG_EMERG, log, 0, "still could not bind()");
return NGX_ERROR;
}
return NGX_OK;
}
备注:能够看到ngx_init_cycle()里的ngx_open_listening_sockets()主要功能是socket、bind和listen函数的调用。终于创建完的监听套接字就在cycle结构体的listening域里。
ngx_master_process_cycle(ngx_cycle_t *cycle)
void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{
//加入信号集
sigemptyset(&set);
sigaddset(&set, SIGCHLD);
sigaddset(&set, SIGALRM);
sigaddset(&set, SIGIO);
sigaddset(&set, SIGINT);
sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));
if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
sigemptyset(&set);
。
。
。
//依据配置文件。启动子进程,子进程进入自己的事件循环
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
//master进程进入自己的事件循环。即接收信号、管理worker进程
for ( ;; ) {
。。
。
}
}
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
主要功能:创建worker子进程。
static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
for (i = 0; i < n; i++) {
cpu_affinity = ngx_get_cpu_affinity(i);
ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
"worker process", type);
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
}
}
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,char *name, ngx_int_t respawn)
ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
char *name, ngx_int_t respawn)
{
。。
。
pid = fork();
switch (pid) {
case -1:
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
case 0:
//子进程
ngx_pid = ngx_getpid();
proc(cycle, data); //子进程进入自己的事件循环
break;
default:
//父进程
break;
}
。。。
}
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_uint_t i;
ngx_connection_t *c;
ngx_process = NGX_PROCESS_WORKER;
//初始化,并设置子进程title
ngx_worker_process_init(cycle, 1);
ngx_setproctitle("worker process");
。。
。
//子进程自己的事件循环
for ( ;; ) {
//退出状态已设置。关闭全部连接
if (ngx_exiting) {
c = cycle->connections;
for (i = 0; i < cycle->connection_n; i++) {
if (c[i].fd != -1 && c[i].idle) {
c[i].close = 1;
c[i].read->handler(c[i].read);
}
}
if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
{
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
//处理事件和计时
ngx_process_events_and_timers(cycle);
。
。。
}
}
备注:worker进程的事件循环就是监听网络事件并处理(如新建连接、断开连接、处理请求、发送响应等)。所以真正的连接终于是连到了worker进程。可是worker进程之间是怎么调用accept()函数呢?
全部的worker进程都有监听套接字,都可以accept一个连接,可是nginx准备了一个accept锁。因此全部的子进程在走到处理新连接这一步的时候都要争下accept锁,争到锁的worker进程可以调用accept()并接受新连接。
这样做的目的就是为了防止多个进程同一时候accept,当一个连接来的时候多个进程同一时候被唤起,即惊群。
ngx_process_events_and_timers(ngx_cycle_t *cycle)
函数功能:事件循环的核心。
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
// 假设配置文件里设置了时间精度
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
...
}
// ngx_use_accept_mutex变量代表是否使用accept相互排斥体,默认使用,accept_mutex
off,指令关闭。accept mutex的作用就是避免惊群,同一时候实现负载均衡。
if (ngx_use_accept_mutex) {
。就表示该进程接受的连接过多,因此就放弃一次争抢accept
mutex的机会,同一时候将
。
然后,继续处理已有连接上的事件。
nginx就借用此变量实现了进程关于连接的基本负载均衡。
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
// 尝试加锁accept mutex,仅仅有成功获取锁的进程,才会将listen套接字放入epool中,因此保证了仅仅有一个进程拥有监听套接口,故全部进程堵塞在epool_wait时,不会出现惊群现象。
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
//获取锁的进程,将加入一个NGX_POST_EVENTS标志,此标志的作用是将全部产生的事件放入一个队列中。等释放锁后。再慢慢来处理事件。
由于,处理事件可能会非常耗时。假设不先释放锁再处理的话,该进程就长时间霸占了锁。导致其它进程无法获取锁,这样accept
的效率就低了。
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
//设置最长延迟多久。再去争抢锁
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
// 调用process_events钩子轮询事件,有些事件即时调用事件处理函数处理,有些事件放入延迟队列等待后面处理。ngx_process_events的详细实现是相应到epoll模块中的ngx_epoll_process_events函数
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
// 有须要延迟处理的监听套接口事件
if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
// 释放锁
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
//delta是上文对epool wait事件的耗时统计。存在毫秒级的耗时就对全部事件的timer进行检查。假设time
out就从timer rbtree中删除到期的timer。同一时候调用对应事件的handler函数完毕处理
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);
// 有须要延迟处理的数据套接口事件
if (ngx_posted_events) {
// 处理
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}