Swoole版本号:1.7.5-stable
Github地址:https://github.com/LinkedDestiny/swoole-src-analysis
最终能够正式进入Server.c模块了…… 在之前的分析中,能够看到非常多相关模块的声明都已经写在了Server.h中,就是由于这些模块构成了Server的核心部分。而Server本身,则是一个最上层的对象,它包含了核心的Reactor和Factory模块,存放了消息队列的key值,控制着所有的Connection。所有PHP层面的回调函数也在这里指定;同一时候。Server存放了大量的属性值,这些值决定了整个Swoole的具体特征。
直接上源代码。首先是swServer结构体。这个结构体定义了一个swoole_server对象所拥有的所有成员。其声明在Server.h文件的197 - 406 行。由于该结构体太长,因此我将分段贴出其成员并分析。
/**
* tcp socket listen backlog
*/
uint16_t backlog;
/**
* reactor thread/process num
*/
uint16_t reactor_num;
uint16_t writer_num;
/**
* worker process num
*/
uint16_t worker_num; /**
* The number of pipe per reactor maintenance
*/
uint16_t reactor_pipe_num; uint8_t factory_mode; /**
* run as a daemon process
*/
uint8_t daemonize; /**
* package dispatch mode
*/
uint8_t dispatch_mode; //分配模式。1平均分配。2按FD取摸固定分配。3,使用抢占式队列(IPC消息队列)分配 /**
* 1: unix socket, 2: message queue, 3: memory channel
*/
uint8_t ipc_mode;
backlog參数为socket监听时listen函数的參数。表示listen能够保持的最大连通数。
reactor_num指定了reactor进程的数量,worker_num指定了worker进程的数量。reactor_pipe_num指定了每一个reactor所拥有的管道数量。factory_mode指定了Swoole_server执行于哪种执行模式下(Base模式、单进程单线程、多进程、多线程)。daemonize指定是否作为守护进程执行(也就是后台执行)。dispatch_mode指定了收到消息时worker进程的分配模式,ipc_mode则指定了进程间的通信模式。
int worker_uid;
int worker_groupid; /**
* max connection num
*/
uint32_t max_connection; /**
* worker process max request
*/
uint32_t max_request;
/**
* task worker process max request
*/
uint32_t task_max_request; int timeout_sec;
int timeout_usec; int sock_client_buffer_size; //client的socket缓存区设置
int sock_server_buffer_size; //server的socket缓存区设置 char log_file[SW_LOG_FILENAME]; //日志文件 int signal_fd;
int event_fd; int ringbuffer_size;
worker_uid和worker_groupid是两个无用变量,预计是历史遗留……max_connection指定同意的最大连接数。max_request指定了每一个worker进程能处理的最大请求数,task_max_request指定了每一个task worker进程能处理的最大任务请求。timeout_sec和timeout_usec定义了超时时间。
signal_fd和event_fd相同是废弃变量。ringbuffer_size指定了RingBuffer的大小。
uint16_t reactor_round_i; //轮询调度
uint16_t reactor_next_i; //平均算法调度
uint16_t reactor_schedule_count; uint16_t worker_round_id; int udp_sock_buffer_size; //UDP暂时包数量。超过数量未处理将会被丢弃 /**
* reactor ringbuffer memory pool size
*/
size_t reactor_ringbuffer_size; /**
* have udp listen socket
*/
uint8_t have_udp_sock; /**
* have tcp listen socket
*/
uint8_t have_tcp_sock; /**
* oepn cpu affinity setting
*/
uint8_t open_cpu_affinity;
前三个属性用于Reactor的调度算法。而实际上reactor仅仅使用了轮询调度,因此后两个属性都没实用到……worker_round_id用于worker轮询调度,reactor_ringbuffer_size指定了reactor中的RingBuffer内存池的大小。
have_udp_sock和have_tcp_sock用于标记当前Server是否有udp和tcp的监听。open_cpu_affinity标记是否打开了cpu亲和性设置。
/**
* open tcp_defer_accept option
*/
uint8_t tcp_defer_accept; //TCP_DEFER_ACCEPT /* tcp keepalive */
uint8_t open_tcp_keepalive; //开启keepalive
uint16_t tcp_keepidle; //如该连接在规定时间内没有不论什么数据往来,则进行探測
uint16_t tcp_keepinterval; //探測时发包的时间间隔
uint16_t tcp_keepcount; //探測尝试的次数 /* heartbeat check time*/
uint16_t heartbeat_idle_time; //心跳存活时间
uint16_t heartbeat_check_interval; //心跳定时侦測时间, 必需小于heartbeat_idle_time /**
* 来自client的心跳侦測包
*/
char heartbeat_ping[SW_HEARTBEAT_PING_LEN];
uint8_t heartbeat_ping_length; /**
* server端对心跳包的响应
*/
char heartbeat_pong[SW_HEARTBEAT_PING_LEN];
uint8_t heartbeat_pong_length;
tcp_defer_accept指定是否打开TCP_DEFER_ACCEPT选项,open_tcp_keepalive用于指定是否打开TCP_KEEPALIVE探測,接下来三个属性则用于设置相应的socket选项,heartbeat_idle_time和heartbeat_check_interval用于控制心跳检測。 这里做一些简单说明。
TCP_DEFER_ACCEPT选项的意义在于,假设某个client发起连接后并没有发送数据,则server会先临时挂起这个连接。不Accept,不建立IO通道,可是保留port号;当client发送数据过来后,server会直接Accept这个port号,建立IO。
tcp_keepalive是tcp内部的一个连接保持机制,而heartbeat是应用层控制的连接检測。
/* one package: eof check */
uint8_t open_eof_check; //检測数据EOF
uint8_t package_eof_len; //数据缓存结束符长度
//int data_buffer_max_num; //数据缓存最大个数(超过此数值的连接会被当作坏连接。将清除缓存&关闭连接)
//uint8_t max_trunk_num; //每一个请求最大同意创建的trunk数
char package_eof[SW_DATA_EOF_MAXLEN]; //数据缓存结束符 /**
* built-in http protocol
*/
uint8_t open_http_protocol;
uint32_t http_max_post_size;
uint32_t http_max_websocket_size; /* one package: length check */
uint8_t open_length_check; //开启协议长度检測 char package_length_type; //length field type
uint8_t package_length_size;
uint16_t package_length_offset; //第几个字节開始表示长度
uint16_t package_body_offset; //第几个字节開始计算长度
uint32_t package_max_length;
这一组变量控制着swoole不同的分包机制。
第一组为eof检測,第二组为内建的http协议。第三组为包长检測。这里都有中文凝视。我就不再赘述。
/**
* Use data key as factory->dispatch() param
*/
uint8_t open_dispatch_key;
uint8_t dispatch_key_size;
uint16_t dispatch_key_offset;
uint16_t dispatch_key_type; /* buffer output/input setting*/
uint32_t buffer_output_size;
uint32_t buffer_input_size; #ifdef SW_USE_OPENSSL
uint8_t open_ssl;
char *ssl_cert_file;
char *ssl_key_file;
#endif
open_dispatch_key指定是否在dispatch时给数据加上key值,后面三个属性用于控制key的相关特性。
buffer_output_size和buffer_input_size指定了输入输出缓存区的大小。
最后一组參数用于设置SSL特性,指定了相应的私钥和证书。
void *ptr2; swReactor reactor;
swFactory factory; swListenList_node *listen_list; swReactorThread *reactor_threads;
swWorkerThread *writer_threads; swWorker *workers; swQueue read_queue;
swQueue write_queue; swConnection *connection_list; //连接列表
int connection_list_capacity; //超过此容量,会自己主动扩容 /**
* message queue key
*/
uint64_t message_queue_key; swReactor *reactor_ptr; //Main Reactor
swFactory *factory_ptr; //Factory
第一个ptr2变量存放了一个zval指针。存放的是一个server在PHP中的this指针(个人理解和推測,未验证)。接下来的几个属性我想大家应该都不陌生。在之前的分析中也多次提到了这些參数。须要说明的是。最后的两个指针是遗留变量,没有实际用处。
最后还有N个回调函数指针。我就不贴代码了,我想大家都看得懂他们是干嘛的……
然后swServer大概有……17个相关操作函数,这些函数声明在Server.h文件的435 - 449行,例如以下:
int swServer_onFinish(swFactory *factory, swSendData *resp);
int swServer_onFinish2(swFactory *factory, swSendData *resp); void swServer_init(swServer *serv);
void swServer_signal_init(void);
int swServer_start(swServer *serv);
int swServer_addListener(swServer *serv, int type, char *host,int port);
int swServer_create(swServer *serv);
int swServer_listen(swServer *serv, swReactor *reactor);
int swServer_free(swServer *serv);
int swServer_shutdown(swServer *serv);
int swServer_addTimer(swServer *serv, int interval);
int swServer_reload(swServer *serv);
int swServer_udp_send(swServer *serv, swSendData *resp);
int swServer_tcp_send(swServer *serv, int fd, void *data, int length);
int swServer_reactor_add(swServer *serv, int fd, int sock_type); //no use
int swServer_reactor_del(swServer *serv, int fd, int reacot_id); //no use
int swServer_get_manager_pid(swServer *serv);
然后我尽量依照调用的先后顺序来分析这些函数
首先是swServer_init函数。这个函数主要是设置swServer的相关属性。并通过swoole_init函数设置相关的全局变量,基本都是基础的赋值。这里须要重点分析swServerG结构体。该结构体声明在swoole.h的1052 - 1104行。其声明例如以下:
typedef struct
{
swTimer timer; // 全局定时器 int running; // 是否正在执行
int error; // 错误码errno
int process_type; // 标记当前进程的类型(manager or worker)
int signal_alarm; //for timer with message queue
int signal_fd; // 没有找到赋值语句,应该是已经废弃
int log_fd; // 日志文件的描写叙述符 uint8_t use_timerfd; // 是否使用Linux提供的timerfd功能
uint8_t use_signalfd; // 是否使用signalfd功能
/**
* Timer used pipe
*/
uint8_t use_timer_pipe; // timer是否使用管道
uint8_t task_ipc_mode; // timer的通知模式 /**
* task worker process num
*/
uint16_t task_worker_num; // task worker进程的数量 uint16_t cpu_num; // cpu的核数 uint32_t pagesize; // 当前进程的分页大小
uint32_t max_sockets; // 同意的最大socket fd /**
* Unix socket default buffer size
*/
uint32_t unixsock_buffer_size; swServer *serv; // 指向swServer
swFactory *factory; // 指向swServer里的factory
swLock lock; swProcessPool task_workers; // task_workers的进程池
swProcessPool *event_workers;// 用于BASE模式 swMemoryPool *memory_pool; // 内存池
swReactor *main_reactor; // 全局主reactor,在主进程中仅仅用于接收TCP连接,在worker进程中还须要额外处理管道的事件 /**
* for swoole_server->taskwait
*/
swPipe *task_notify;
swEventData *task_result; pthread_t heartbeat_pidt; // 心跳线程 } swServerG;
该结构体用于存放本地全局变量。这些变量会在不论什么地方被使用和訪问。我在每一个属性后加上了凝视,标明了每一个属性的功能。
接下来是swServer_create函数,该函数的实际功能事实上是……依据factory_mode创建相应的Factory。
该函数定义在Server.c的671 - 678行,例如以下:
if (serv->package_eof_len > sizeof(serv->package_eof))
{
serv->package_eof_len = sizeof(serv->package_eof);
} //初始化日志
if (serv->log_file[0] != 0)
{
swLog_init(serv->log_file);
} //保存指针到全局变量中去
//TODO 未来所有使用此方式訪问swServer/swFactory对象
SwooleG.serv = serv;
SwooleG.factory = &serv->factory; //单进程单线程模式
if (serv->factory_mode == SW_MODE_SINGLE)
{
return swReactorProcess_create(serv);
}
else
{
return swReactorThread_create(serv);
}
源代码解释:假设eof的长度超过了最大长度(8字节),则设置长度为8字节。
随后初始化日志,并设置全局变量SwooleG中的serv和factory,接着依据factory_mode调用详细的函数。(swReactorThread_create參考12章)
接着是swServer_start函数,该函数用于启动一个swServer。其定义在Server.c文件的456 - 620行,由于函数较长。再此做分段分析。
swFactory *factory = &serv->factory;
int ret; ret = swServer_start_check(serv);
if (ret < 0)
{
return SW_ERR;
} #if SW_WORKER_IPC_MODE == 2
serv->ipc_mode = SW_IPC_MSGQUEUE;
#endif if (serv->message_queue_key == 0)
{
char path_buf[128];
char *path_ptr = getcwd(path_buf, 128);
serv->message_queue_key = ftok(path_ptr, 1) + getpid();
} if (serv->ipc_mode == SW_IPC_MSGQUEUE)
{
SwooleG.use_timerfd = 0;
SwooleG.use_signalfd = 0;
SwooleG.use_timer_pipe = 0;
} #ifdef SW_USE_OPENSSL
if (serv->open_ssl)
{
if (swSSL_init(serv->ssl_cert_file, serv->ssl_key_file) < 0)
{
return SW_ERR;
}
}
#endif
源代码解释:首先通过swServer_start_check函数检測相关属性是否设置正确。
随后设置ipc_mode,假设消息队列key值为0,则生成一个key。假设使用了消息队列,则关闭timerfd,signalfd和timer管道。假设开启了SSL选项。则调用swSSL_init初始化SSL设置。
//run as daemon
if (serv->daemonize > 0)
{
/**
* redirect STDOUT to log file
*/
if (SwooleG.log_fd > STDOUT_FILENO)
{
if (dup2(SwooleG.log_fd, STDOUT_FILENO) < 0)
{
swWarn("dup2() failed. Error: %s[%d]", strerror(errno), errno);
}
}
/**
* redirect STDOUT_FILENO/STDERR_FILENO to /dev/null
*/
else
{
int null_fd = open("/dev/null", O_WRONLY);
if (null_fd > 0)
{
if (dup2(null_fd, STDOUT_FILENO) < 0)
{
swWarn("dup2(STDOUT_FILENO) failed. Error: %s[%d]", strerror(errno), errno);
}
if (dup2(null_fd, STDERR_FILENO) < 0)
{
swWarn("dup2(STDERR_FILENO) failed. Error: %s[%d]", strerror(errno), errno);
}
}
else
{
swWarn("open(/dev/null) failed. Error: %s[%d]", strerror(errno), errno);
}
} if (daemon(0, 1) < 0)
{
return SW_ERR;
}
}
源代码解释:假设设置为守护进程,假设设置了日志文件,则将输出定向到日志文件里,否则将输入定向到/dev/null中(抹去输出)
//master pid
SwooleGS->master_pid = getpid();
SwooleGS->start = 1;
SwooleGS->now = SwooleStats->start_time = time(NULL); serv->reactor_pipe_num = serv->worker_num / serv->reactor_num; //设置factory回调函数
serv->factory.ptr = serv;
serv->factory.onTask = serv->onReceive; if (serv->have_udp_sock == 1 && serv->factory_mode != SW_MODE_PROCESS)
{
serv->factory.onFinish = swServer_onFinish2;
}
else
{
serv->factory.onFinish = swServer_onFinish;
} serv->workers = SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker));
if (serv->workers == NULL)
{
swWarn("[Master] malloc[object->workers] failed");
return SW_ERR;
} /*
* For swoole_server->taskwait, create notify pipe and result shared memory.
*/
if (SwooleG.task_worker_num > 0 && serv->worker_num > 0)
{
int i;
SwooleG.task_result = sw_shm_calloc(serv->worker_num, sizeof(swEventData));
SwooleG.task_notify = sw_calloc(serv->worker_num, sizeof(swPipe));
for (i = 0; i < serv->worker_num; i++)
{
if (swPipeNotify_auto(&SwooleG.task_notify[i], 1, 0))
{
return SW_ERR;
}
}
}
源代码解释:设置master进程id为当前进程id。设置start标记并储存启动时间。
接下来设置reactor的管道数量,设置factory的onTask回调。假设有UDP监听而且没有使用多进程模式,则设置factory的onFinish回调为swServer_onFinish2回调(当前的swoole模式下基本没用)。否则,使用swServer_onFinish回调。接着。在全局内存池中分配worker结构体数组所须要的内存。最后。假设task_worker_num大于0。则在全局变量中为task_result分配共享内存,并创建task_notify管道数组。
//factory start
if (factory->start(factory) < 0)
{
return SW_ERR;
}
//Signal Init
swServer_signal_init(); //标识为主进程
SwooleG.process_type = SW_PROCESS_MASTER; //启动心跳检測
if (serv->heartbeat_check_interval >= 1 && serv->heartbeat_check_interval <= serv->heartbeat_idle_time)
{
swTrace("hb timer start, time: %d live time:%d", serv->heartbeat_check_interval, serv->heartbeat_idle_time);
swServer_heartbeat_start(serv);
} if (serv->factory_mode == SW_MODE_SINGLE)
{
ret = swReactorProcess_start(serv);
}
else
{
ret = swServer_start_proxy(serv);
} if (ret < 0)
{
SwooleGS->start = 0;
} //server stop
if (serv->onShutdown != NULL)
{
serv->onShutdown(serv);
}
swServer_free(serv);
return SW_OK;
源代码解释:调用factory的start函数启动factory,调用swServer_signal_init函数初始化信号回调,并设置当前进程类型为master类型。
假设设置了心跳检測,则通过swServer_heartbeat_start函数启动心跳检測。
假设使用单进程模式,则直接调用swReactorProcess_start函数启动。否则,调用swServer_start_proxy函数启动Server。Server关闭后,调用onShutdown回调。并释放内存。
那么这里须要分析swServer_start_proxy函数。该函数定义了一个proxy模式。在单独的n个线程(进程)中接受维持TCP连接。该函数定义在Server.c文件的405 - 454行,例如以下:
int ret;
swReactor *main_reactor = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swReactor)); #ifdef SW_MAINREACTOR_USE_POLL
ret = swReactorPoll_create(main_reactor, 10);
#else
ret = swReactorSelect_create(main_reactor);
#endif if (ret < 0)
{
swWarn("Reactor create failed");
return SW_ERR;
}
ret = swReactorThread_start(serv, main_reactor);
if (ret < 0)
{
swWarn("ReactorThread start failed");
return SW_ERR;
}
SwooleG.main_reactor = main_reactor;
main_reactor->id = serv->reactor_num; //设为一个特别的ID
main_reactor->ptr = serv;
main_reactor->setHandle(main_reactor, SW_FD_LISTEN, swServer_master_onAccept); main_reactor->onFinish = swServer_master_onReactorFinish;
main_reactor->onTimeout = swServer_master_onReactorTimeout; #ifdef HAVE_SIGNALFD
if (SwooleG.use_signalfd)
{
swSignalfd_setup(main_reactor);
}
#endif
//SW_START_SLEEP;
if (serv->onStart != NULL)
{
serv->onStart(serv);
}
struct timeval tmo;
tmo.tv_sec = SW_MAINREACTOR_TIMEO;
tmo.tv_usec = 0; //先更新一次时间
swServer_update_time(); return main_reactor->wait(main_reactor, &tmo);
源代码解释:在全局内存池中创建一个主reactor,调用swReactorThread_start函数启动这个reactor,并设置全局变量SwooleG中的main_reactor为该reactor。接着设置reactor的相关属性和回调函数,设置该reactor监听LISTEN事件并设置回调为swServer_master_onAccept。
假设serv设置了onStart回调,调用它。随后,调用swServer_update_time函数更新当前时间,然后直接进入reactor的wait函数開始监听连接。
至此,一个完整的swServer启动流程的相关函数以及分析完毕。下一章将分析余下的函数以及相关的回调函数。