简介:
半同步/半反应堆线程池是通过一个线程往工作队列添加任务T,然后工作线程竞争工作队列获得任务T。HTTP请求解析服务端程序:逐行解析客户端发送来的HTTP请求然后作出HTTP回答。采用线程池就是:服务端创建一个线程池,然后有HTTP请求到达就将HTTP请求的处理添加到线程池任务队列中去,线程池工作线程通过竞态机制(信号量)竞争任务T(HTTP请求处理)。
HTTP请求内容:
GET http://www.baidu.com/index.html HTTP/1.1 //该行是HTTP请求行,GET是请求方法表示以只读方式获取资源 http那一串是url HTTP那个是客户端HTTP版本
User-Agent:Wget/1.12 (linux-gnu) //客户端使用的程序
Host: www.baidu.com //目的主机名,必须有
Connection: close //用于告诉服务器处理完这个请求后关闭连接,可选其它
注意:第一行为请求行,后面3行是HTTP头部,HTTP请求每行均以回车符和换行符结束,所有头部字段结束后必须包含一个空行,该空行只能有一个 回车和换行符。HTTP回答类似。可见HTTP解析分为请求行和头部,每行按照\r\t提取
半同步/半异步线程池:threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"//简单封装了互斥量和信号量的接口
template< typename T >
class threadpool//线程池类模板参数T是任务类型,T中必须有接口process
{
public:
threadpool( int thread_number = 8, int max_requests = 10000 );//线程数目和最大连接处理数
~threadpool();
bool append( T* request );
private:
static void* worker( void* arg );//线程工作函数
void run();//启动线程池
private:
int m_thread_number;//线程数量
int m_max_requests;//最大连接数目
pthread_t* m_threads;//线程id
std::list< T* > m_workqueue;//工作队列:各线程竞争该队列并处理相应的任务逻辑T
locker m_queuelocker;//工作队列互斥量
sem m_queuestat;//信号量:用于工作队列
bool m_stop;//终止标志
};
template< typename T >
threadpool< T >::threadpool( int thread_number, int max_requests ) :
m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL )
{
if( ( thread_number <= 0 ) || ( max_requests <= 0 ) )
{
throw std::exception();
}
m_threads = new pthread_t[ m_thread_number ];//工作线程数组
if( ! m_threads )
{
throw std::exception();
}
for ( int i = 0; i < thread_number; ++i )//创建工作线程
{
printf( "create the %dth thread\n", i );
if( pthread_create( m_threads + i, NULL, worker, this ) != 0 )//注意C++调用pthread_create函数的第三个参数必须是一个静态函数,一个静态成员使用动态成员的方式:通过类静态对象、将类对象作为参数传给静态函数。这里使用了后者所以有this
{
delete [] m_threads;
throw std::exception();
}
if( pthread_detach( m_threads[i] ) )//线程分离后其它线程无法pthread_join等待
{
delete [] m_threads;
throw std::exception();
}
}
}
template< typename T >
threadpool< T >::~threadpool()
{
delete [] m_threads;
m_stop = true;
}
template< typename T >
bool threadpool< T >::append( T* request )//向工作队列添加任务T
{
m_queuelocker.lock();//非原子操作需要互斥量保护
if ( m_workqueue.size() > m_max_requests )//任务队列满了
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back( request );
m_queuelocker.unlock();
m_queuestat.post();//信号量的V操作,即信号量+1多了个工作任务T
return true;
}
template< typename T >
void* threadpool< T >::worker( void* arg )//工作线程函数
{
threadpool* pool = ( threadpool* )arg;//获取进程池对象
pool->run();//调用线程池run函数
return pool;
}
template< typename T >
void threadpool< T >::run()//工作线程真正工作逻辑:从任务队列领取任务T并执行任务T
{
while ( ! m_stop )
{
m_queuestat.wait();//信号量P操作,申请信号量获取任务T
m_queuelocker.lock();//互斥量保护任务队列,和前面的信号量顺序不能呼唤。。。你懂的
if ( m_workqueue.empty() )
{
m_queuelocker.unlock();//任务队列空那就没任务呗
continue;
}
T* request = m_workqueue.front();//获取任务T
m_workqueue.pop_front();
m_queuelocker.unlock();
if ( ! request )
{
continue;
}
request->process();//执行任务T的相应逻辑,任务T中必须有process接口
}
}
#endif
HTTP请求任务T:http_conn.h接收到一个HTTP请求后解析该请求,然后作出应答
#ifndef HTTPCONNECTION_H
#define HTTPCONNECTION_H
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <errno.h>
#include "locker.h"//互斥量和信号量的简单封装
class http_conn//HTTP连接任务类型,用于线程池工作队列的任务类型T
{
public:
static const int FILENAME_LEN = 200;//文件名最大长度,文件是HTTP请求的资源页文件
static const int READ_BUFFER_SIZE = 2048;//读缓冲区,用于读取HTTP请求
static const int WRITE_BUFFER_SIZE = 1024;//写缓冲区,用于HTTP回答
enum METHOD { GET = 0, POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT, PATCH };//HTTP请求方法,本程序只定义了GET逻辑
enum CHECK_STATE { CHECK_STATE_REQUESTLINE = 0, CHECK_STATE_HEADER, CHECK_STATE_CONTENT };//HTTP请求状态:正在解析请求行、正在解析头部、解析中
enum HTTP_CODE { NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION };//HTTP请求结果:未完整的请求(客户端仍需要提交请求)、完整的请求、错误请求...只用了前三个
enum LINE_STATUS { LINE_OK = 0, LINE_BAD, LINE_OPEN };//HTTP每行解析状态:改行解析完毕、错误的行、正在解析行
public:
http_conn(){}
~http_conn(){}
public:
void init( int sockfd, const sockaddr_in& addr );//初始化新的HTTP连接
void close_conn( bool real_close = true );
void process();//处理客户请求
bool read();//读取客户发送来的数据(HTTP请求)
bool write();//将请求结果返回给客户端
private:
void init();//重载init初始化连接,用于内部调用
HTTP_CODE process_read();//解析HTTP请求,内部调用parse_系列函数
bool process_write( HTTP_CODE ret );//填充HTTP应答,通常是将客户请求的资源页发送给客户,内部调用add_系列函数
HTTP_CODE parse_request_line( char* text );//解析HTTP请求的请求行
HTTP_CODE parse_headers( char* text );//解析HTTP头部数据
HTTP_CODE parse_content( char* text );//获取解析结果
HTTP_CODE do_request();//处理HTTP连接:内部调用process_read(),process_write()
char* get_line() { return m_read_buf + m_start_line; }//获取HTTP请求数据中的一行数据
LINE_STATUS parse_line();//解析行内部调用parse_request_line和parse_headers
//下面的函数被process_write填充HTTP应答
void unmap();//解除内存映射,这里内存映射是指将客户请求的资源页文件映射通过mmap映射到内存
bool add_response( const char* format, ... );
bool add_content( const char* content );
bool add_status_line( int status, const char* title );
bool add_headers( int content_length );
bool add_content_length( int content_length );
bool add_linger();
bool add_blank_line();
public:
static int m_epollfd;//所有socket上的事件都注册到一个epoll事件表中所以用static
static int m_user_count;//用户数量
private:
int m_sockfd;//HTTP连接对应的客户在服务端的描述符m_sockfd和地址m_address
sockaddr_in m_address;
char m_read_buf[ READ_BUFFER_SIZE ];//读缓冲区,读取HTTP请求
int m_read_idx;//已读入的客户数据最后一个字节的下一个位置,即未读数据的第一个位置
int m_checked_idx;//当前已经解析的字节(HTTP请求需要逐个解析)
int m_start_line;//当前解析行的起始位置
char m_write_buf[ WRITE_BUFFER_SIZE ];//写缓冲区
int m_write_idx;//写缓冲区待发送的数据
CHECK_STATE m_check_state;//HTTP解析的状态:请求行解析、头部解析
METHOD m_method;//HTTP请求方法,只实现了GET
char m_real_file[ FILENAME_LEN ];//HTTP请求的资源页对应的文件名称,和服务端的路径拼接就形成了资源页的路径
char* m_url;//请求的具体资源页名称,如:www.baidu.com/index.html
char* m_version;//HTTP协议版本号,一般是:HTTP/1.1
char* m_host;//主机名,客户端要在HTTP请求中的目的主机名
int m_content_length;//HTTP消息体的长度,简单的GET请求这个为空
bool m_linger;//HTTP请求是否保持连接
char* m_file_address;//资源页文件内存映射后的地址
struct stat m_file_stat;//资源页文件的状态,stat文件结构体
struct iovec m_iv[2];//调用writev集中写函数需要m_iv_count表示被写内存块的数量,iovec结构体存放了一段内存的起始位置和长度,
int m_iv_count;//m_iv_count是指iovec结构体数组的长度即多少个内存块
};
#endif
任务T:http_conn.cpp
#include "http_conn.h"
//定义了HTTP请求的返回状态信息,类似大家都熟悉的404
const char* ok_200_title = "OK";
const char* error_400_title = "Bad Request";
const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
const char* error_403_title = "Forbidden";
const char* error_403_form = "You do not have permission to get file from this server.\n";
const char* error_404_title = "Not Found";
const char* error_404_form = "The requested file was not found on this server.\n";
const char* error_500_title = "Internal Error";
const char* error_500_form = "There was an unusual problem serving the requested file.\n";
const char* doc_root = "/var/www/html";//服务端资源页的路径,将其和HTTP请求中解析的m_url拼接形成资源页的位置
int setnonblocking( int fd )//将fd设置为非阻塞
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
void addfd( int epollfd, int fd, bool one_shot )//将fd添加到事件表epollfd
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
if( one_shot )//采用EPOLLONESHOT事件避免了同一事件被多次触发,因为一个事件只被触发一次且需要重置事件才能侦听下次是否发生
{
event.events |= EPOLLONESHOT;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
void removefd( int epollfd, int fd )//将fd从事件表epollfd中移除
{
epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 );
close( fd );
}
void modfd( int epollfd, int fd, int ev )//EPOLLONESHOT需要重置事件后事件才能进行下次侦听
{
epoll_event event;
event.data.fd = fd;
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );//注意是EPOLL_CTL_MOD修改
}
int http_conn::m_user_count = 0;//连接数
int http_conn::m_epollfd = -1;//事件表,注意是static故所有http_con类对象共享一个事件表
void http_conn::close_conn( bool real_close )//关闭连接,从事件表中移除描述符
{
if( real_close && ( m_sockfd != -1 ) )//m_sockfd是该HTTP连接对应的描述符
{
//modfd( m_epollfd, m_sockfd, EPOLLIN );
removefd( m_epollfd, m_sockfd );
m_sockfd = -1;
m_user_count--;
}
}
void http_conn::init( int sockfd, const sockaddr_in& addr )//初始化连接
{
m_sockfd = sockfd;//sockfd是http连接对应的描述符用于接收http请求和http回答
m_address = addr;//客户端地址
int error = 0;
socklen_t len = sizeof( error );
getsockopt( m_sockfd, SOL_SOCKET, SO_ERROR, &error, &len );
int reuse = 1;
setsockopt( m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );//获取描述符状态,可以在调试时用
addfd( m_epollfd, sockfd, true );
m_user_count++;//多了一个http用户
init();//调用重载函数
}
void http_conn::init()//重载init函数进行些连接前的初始化操作
{
m_check_state = CHECK_STATE_REQUESTLINE;
m_linger = false;
m_method = GET;
m_url = 0;
m_version = 0;
m_content_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_idx = 0;
m_read_idx = 0;
m_write_idx = 0;
memset( m_read_buf, '\0', READ_BUFFER_SIZE );
memset( m_write_buf, '\0', WRITE_BUFFER_SIZE );
memset( m_real_file, '\0', FILENAME_LEN );
}
http_conn::LINE_STATUS http_conn::parse_line()//解析HTTP数据:将HTTP数据的每行数据提取出来,每行以回车\r和换行符\n结束
{
char temp;
for ( ; m_checked_idx < m_read_idx; ++m_checked_idx )//m_checked_idx是当前正在解析的字节,m_read_idx是读缓冲区中已有的数据(客户端发送了多少HTTP请求数据到来),解析到m_read_idx号字节
{
temp = m_read_buf[ m_checked_idx ];//当前解析字节
if ( temp == '\r' )//若为回车符:
{
if ( ( m_checked_idx + 1 ) == m_read_idx )//若为回车符:若此回车符是已读取数据的最后一个则仍需要解析改行(即该行数据还没有接收完整)
{
return LINE_OPEN;
}
else if ( m_read_buf[ m_checked_idx + 1 ] == '\n' )//若回车符的下一个是换行符\r则表明该行解析完毕(回车+换行是HTTP请求每行固定结束规则)
{
m_read_buf[ m_checked_idx++ ] = '\0';//将该行数据送给缓冲区
m_read_buf[ m_checked_idx++ ] = '\0';
return LINE_OK;//返回状态:该行解析成功
}
return LINE_BAD;//否则解析失败
}
else if( temp == '\n' )//解析的字符是换行符则前一个必须是回车才解析成功
{
if( ( m_checked_idx > 1 ) && ( m_read_buf[ m_checked_idx - 1 ] == '\r' ) )
{
m_read_buf[ m_checked_idx-1 ] = '\0';
m_read_buf[ m_checked_idx++ ] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN;//正在解析,还有HTTP请求数据没有接收到....
}
bool http_conn::read()//读取HTTP请求数据
{
if( m_read_idx >= READ_BUFFER_SIZE )//读缓冲区已满
{
return false;
}
int bytes_read = 0;//记录接收的字节数
while( true )//循环读取的原因是EPOLLONESHOT一个事件只触发一次所以需要一次性读取完全否则数据丢失
{
bytes_read = recv( m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0 );//接收客户端的HTTP请求
if ( bytes_read == -1 )
{
if( errno == EAGAIN || errno == EWOULDBLOCK )//非阻塞描述符这两个errno不是网络出错而是设备当前不可得,在这里就是一次事件的数据读取完毕
{
break;
}
return false;//否则recv出错
}
else if ( bytes_read == 0 )//客户端关闭了连接
{
return false;
}
m_read_idx += bytes_read;//更新读缓冲区的已读大小(用于解析行函数)
}
return true;
}
http_conn::HTTP_CODE http_conn::parse_request_line( char* text )//解析HTTP的请求行部分
{
m_url = strpbrk( text, " \t" );//在text搜索\t的位置
if ( ! m_url )
{
return BAD_REQUEST;
}
*m_url++ = '\0';
char* method = text;
if ( strcasecmp( method, "GET" ) == 0 )//忽略大小写比较mehtod和GET的大小返回值和strcmp定义相同
{
m_method = GET;
}
else
{
return BAD_REQUEST;
}
m_url += strspn( m_url, " \t" );//strspn函数是在m_url找到第一个\t位置,拼接资源页文件路径
m_version = strpbrk( m_url, " \t" );
if ( ! m_version )
{
return BAD_REQUEST;
}
*m_version++ = '\0';
m_version += strspn( m_version, " \t" );
if ( strcasecmp( m_version, "HTTP/1.1" ) != 0 )//HTTP版本
{
return BAD_REQUEST;
}
if ( strncasecmp( m_url, "http://", 7 ) == 0 )
{
m_url += 7;
m_url = strchr( m_url, '/' );
}
if ( ! m_url || m_url[ 0 ] != '/' )
{
return BAD_REQUEST;
}
m_check_state = CHECK_STATE_HEADER;//将HTTP解析状态更新为解析头部,那么HTTP解析进入解析HTTP头部。这是有限状态机
return NO_REQUEST;
}
http_conn::HTTP_CODE http_conn::parse_headers( char* text )//解析HTTP头部
{
if( text[ 0 ] == '\0' )
{
if ( m_method == HEAD )
{
return GET_REQUEST;//已经获取了一个完整的HTTP请求
}
if ( m_content_length != 0 )//若HTTP请求消息长度不为空
{
m_check_state = CHECK_STATE_CONTENT;//则解析头部后还要解析消息体,所以HTTP解析状态仍为正在解析中...GET请求不会出现这个...
return NO_REQUEST;
}
return GET_REQUEST;
}
else if ( strncasecmp( text, "Connection:", 11 ) == 0 )
{
text += 11;
text += strspn( text, " \t" );
if ( strcasecmp( text, "keep-alive" ) == 0 )
{
m_linger = true;
}
}
else if ( strncasecmp( text, "Content-Length:", 15 ) == 0 )
{
text += 15;
text += strspn( text, " \t" );
m_content_length = atol( text );
}
else if ( strncasecmp( text, "Host:", 5 ) == 0 )
{
text += 5;
text += strspn( text, " \t" );
m_host = text;
}
else
{
printf( "oop! unknow header %s\n", text );
}
return NO_REQUEST;
}
http_conn::HTTP_CODE http_conn::parse_content( char* text )//解析结果
{
if ( m_read_idx >= ( m_content_length + m_checked_idx ) )//若解析到缓冲区的最后位置则获得一个一个完整的连接请求
{
text[ m_content_length ] = '\0';
return GET_REQUEST;
}
return NO_REQUEST;//请求不完整
}
http_conn::HTTP_CODE http_conn::process_read()//完整的HTTP解析
{
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char* text = 0;
while ( ( ( m_check_state == CHECK_STATE_CONTENT ) && ( line_status == LINE_OK ) )
|| ( ( line_status = parse_line() ) == LINE_OK ) ){//满足条件:正在进行HTTP解析、读取一个完整行
text = get_line();//从读缓冲区(HTTP请求数据)获取一行数据
m_start_line = m_checked_idx;//行的起始位置等于正在每行解析的第一个字节
printf( "got 1 http line: %s\n", text );
switch ( m_check_state )//HTTP解析状态跳转
{
case CHECK_STATE_REQUESTLINE://正在分析请求行
{
ret = parse_request_line( text );//分析请求行
if ( ret == BAD_REQUEST )
{
return BAD_REQUEST;
}
break;
}
case CHECK_STATE_HEADER://正在分析请求头部
{
ret = parse_headers( text );//分析头部
if ( ret == BAD_REQUEST )
{
return BAD_REQUEST;
}
else if ( ret == GET_REQUEST )
{
return do_request();//当获得一个完整的连接请求则调用do_request分析处理资源页文件
}
break;
}
case CHECK_STATE_CONTENT://HTTP解析状态仍为正在解析...没有办法只好继续解析呗....解析消息体
{
ret = parse_content( text );
if ( ret == GET_REQUEST )
{
return do_request();
}
line_status = LINE_OPEN;
break;
}
default:
{
return INTERNAL_ERROR;//内部错误
}
}
}
return NO_REQUEST;
}
http_conn::HTTP_CODE http_conn::do_request()//用于获取资源页文件的状态
{
strcpy( m_real_file, doc_root );
int len = strlen( doc_root );
strncpy( m_real_file + len, m_url, FILENAME_LEN - len - 1 );
if ( stat( m_real_file, &m_file_stat ) < 0 )
{
return NO_RESOURCE;//若资源页不存在则HTTP解析结果为:没有资源...万恶的404
}
if ( ! ( m_file_stat.st_mode & S_IROTH ) )
{
return FORBIDDEN_REQUEST;//资源没有权限获取
}
if ( S_ISDIR( m_file_stat.st_mode ) )
{
return BAD_REQUEST;//请求有错
}
int fd = open( m_real_file, O_RDONLY );
m_file_address = ( char* )mmap( 0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0 );//将资源页文件映射到内存
close( fd );
return FILE_REQUEST;//资源页请求成功
}
void http_conn::unmap()//解除资源页文件映射的内存
{
if( m_file_address )
{
munmap( m_file_address, m_file_stat.st_size );//解除映射
m_file_address = 0;
}
}
bool http_conn::write()//将资源页文件发送给客户端
{
int temp = 0;
int bytes_have_send = 0;
int bytes_to_send = m_write_idx;
if ( bytes_to_send == 0 )
{
modfd( m_epollfd, m_sockfd, EPOLLIN );//EPOLLONESHOT事件每次需要重置事件
init();
return true;
}
while( 1 )//
{
temp = writev( m_sockfd, m_iv, m_iv_count );//集中写,m_sockfd是http连接对应的描述符,m_iv是iovec结构体数组表示内存块地址,m_iv_count是数组的长度即多少个内存块将一次集中写到m_sockfd
if ( temp <= -1 )//集中写失败
{
if( errno == EAGAIN )
{
modfd( m_epollfd, m_sockfd, EPOLLOUT );//重置EPOLLONESHOT事件,注册可写事件表示若m_sockfd没有写失败则关闭连接
return true;
}
unmap();//解除内存映射
return false;
}
bytes_to_send -= temp;//待发送数据
bytes_have_send += temp;//已发送数据
if ( bytes_to_send <= bytes_have_send )
{
unmap();//该资源页已经发送完毕该解除映射
if( m_linger )//若要保持该http连接
{
init();//初始化http连接
modfd( m_epollfd, m_sockfd, EPOLLIN );
return true;
}
else
{
modfd( m_epollfd, m_sockfd, EPOLLIN );
return false;
}
}
}
}
bool http_conn::add_response( const char* format, ... )//HTTP应答主要是将应答数据添加到写缓冲区m_write_buf
{
if( m_write_idx >= WRITE_BUFFER_SIZE )
{
return false;
}
va_list arg_list;
va_start( arg_list, format );
int len = vsnprintf( m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list );//将fromat内容输出到m_write_buf
if( len >= ( WRITE_BUFFER_SIZE - 1 - m_write_idx ) )
{
return false;
}
m_write_idx += len;
va_end( arg_list );
return true;
}
bool http_conn::add_status_line( int status, const char* title )
{
return add_response( "%s %d %s\r\n", "HTTP/1.1", status, title );//
}
bool http_conn::add_headers( int content_len )
{
add_content_length( content_len );
add_linger();
add_blank_line();//加空行:回车+换行
}
bool http_conn::add_content_length( int content_len )
{
return add_response( "Content-Length: %d\r\n", content_len );//
}
bool http_conn::add_linger()
{
return add_response( "Connection: %s\r\n", ( m_linger == true ) ? "keep-alive" : "close" );//
}
bool http_conn::add_blank_line()
{
return add_response( "%s", "\r\n" );//
}
bool http_conn::add_content( const char* content )
{
return add_response( "%s", content );
}
bool http_conn::process_write( HTTP_CODE ret )//填充HTTP应答
{
switch ( ret )
{
case INTERNAL_ERROR:
{
add_status_line( 500, error_500_title );
add_headers( strlen( error_500_form ) );
if ( ! add_content( error_500_form ) )
{
return false;
}
break;
}
case BAD_REQUEST:
{
add_status_line( 400, error_400_title );
add_headers( strlen( error_400_form ) );
if ( ! add_content( error_400_form ) )
{
return false;
}
break;
}
case NO_RESOURCE:
{
add_status_line( 404, error_404_title );
add_headers( strlen( error_404_form ) );
if ( ! add_content( error_404_form ) )
{
return false;
}
break;
}
case FORBIDDEN_REQUEST:
{
add_status_line( 403, error_403_title );
add_headers( strlen( error_403_form ) );
if ( ! add_content( error_403_form ) )
{
return false;
}
break;
}
case FILE_REQUEST://资源页文件可用
{
add_status_line( 200, ok_200_title );
if ( m_file_stat.st_size != 0 )
{
add_headers( m_file_stat.st_size );//m_file_stat资源页文件状态
m_iv[ 0 ].iov_base = m_write_buf;//写缓冲区
m_iv[ 0 ].iov_len = m_write_idx;//长度
m_iv[ 1 ].iov_base = m_file_address;//资源页数据内存映射后在m_file_address地址
m_iv[ 1 ].iov_len = m_file_stat.st_size;//文件长度就是该块内存长度
m_iv_count = 2;
return true;
}
else
{
const char* ok_string = "<html><body></body></html>";//请求页位空白
add_headers( strlen( ok_string ) );
if ( ! add_content( ok_string ) )
{
return false;
}
}
}
default:
{
return false;
}
}
m_iv[ 0 ].iov_base = m_write_buf;
m_iv[ 0 ].iov_len = m_write_idx;
m_iv_count = 1;
return true;
}
void http_conn::process()//处理HTTP请求
{
HTTP_CODE read_ret = process_read();//读取HTTP请求数据
if ( read_ret == NO_REQUEST )
{
modfd( m_epollfd, m_sockfd, EPOLLIN );
return;
}
bool write_ret = process_write( read_ret );//发送资源页给客户端
if ( ! write_ret )
{
close_conn();
}
modfd( m_epollfd, m_sockfd, EPOLLOUT );
}
服务端程序:接收HTTP请求并交给线程池处理这些请求
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <cassert>
#include <sys/epoll.h>
#include "locker.h"//该头文件封装了信号量和互斥量
#include "threadpool.h"//半同步/半反应堆线程池
#include "http_conn.h"//HTTP连接任务类T
#define MAX_FD 65536//最大文件数目
#define MAX_EVENT_NUMBER 10000//最大事件数目
extern int addfd( int epollfd, int fd, bool one_shot );//采用http_conn.h的addfd函数
extern int removefd( int epollfd, int fd );//这也是http_conn.h中的函数
void addsig( int sig, void( handler )(int), bool restart = true )//安装信号,用于统一事件源(将信号和IO事件统一监听)
{
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART;
}
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}
void show_error( int connfd, const char* info )
{
printf( "%s", info );
send( connfd, info, strlen( info ), 0 );
close( connfd );
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
addsig( SIGPIPE, SIG_IGN );
threadpool< http_conn >* pool = NULL;
try
{
pool = new threadpool< http_conn >;//创建线程池
}
catch( ... )
{
return 1;
}
http_conn* users = new http_conn[ MAX_FD ];//创建超大的用户HTTP连接任务数组,给定一个http连接的描述符作为下标即可索引到这个任务,空间换时间
assert( users );
int user_count = 0;
int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
struct linger tmp = { 1, 0 };
setsockopt( listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof( tmp ) );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret >= 0 );
ret = listen( listenfd, 5 );
assert( ret >= 0 );
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );//创建事件表
assert( epollfd != -1 );
addfd( epollfd, listenfd, false );//将监听端口添加到事件表,false表示不注册EPOLLONESHOT事件,注意不能将监听端口注册为EPOLLONESHOT事件因为该事件每次发生只触发一次,而accept每次只能连接一个客户,那么多个客户连接请求到来,则必然丢失客户连接请求
http_conn::m_epollfd = epollfd;
while( true )
{
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );//无限期等待sockfd上的注册事件
if ( ( number < 0 ) && ( errno != EINTR ) )//若epoll_wait不是因中断EINTR是出错
{
printf( "epoll failure\n" );
break;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;//获取就绪事件描述符
if( sockfd == listenfd )//监听端口有可读事件则表明有HTTP请求
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );//建立客户连接
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
continue;
}
if( http_conn::m_user_count >= MAX_FD )//HTTP客户数超过MAX_FD
{
show_error( connfd, "Internal server busy" );
continue;
}
users[connfd].init( connfd, client_address );//利用connfd快速索引到http_conn任务类
}
else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) )
{
users[sockfd].close_conn();
}
else if( events[i].events & EPOLLIN )//数据可读:
{
if( users[sockfd].read() )
{
pool->append( users + sockfd );
}
else
{
users[sockfd].close_conn();
}
}
else if( events[i].events & EPOLLOUT )//数据可写,哪里注册了可写EPOLLOUT事件?http_conn工作任务类中write函数将那个http连接的描述符m_sockfd注册了可写事件
{
if( !users[sockfd].write() )//若该http_conn任务对应的http连接写失败了则关闭该http连接
{
users[sockfd].close_conn();
}
}
else
{}
}
}
close( epollfd );
close( listenfd );//这里要提醒的是listenfd由创建它的函数关闭,谁污染谁治理的原则
delete [] users;
delete pool;
return 0;
}
互斥量和信号量的简单封装:locker.h
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
class sem
{
public:
sem()
{
if( sem_init( &m_sem, 0, 0 ) != 0 )
{
throw std::exception();
}
}
~sem()
{
sem_destroy( &m_sem );
}
bool wait()
{
return sem_wait( &m_sem ) == 0;
}
bool post()
{
return sem_post( &m_sem ) == 0;
}
private:
sem_t m_sem;
};
class locker
{
public:
locker()
{
if( pthread_mutex_init( &m_mutex, NULL ) != 0 )
{
throw std::exception();
}
}
~locker()
{
pthread_mutex_destroy( &m_mutex );
}
bool lock()
{
return pthread_mutex_lock( &m_mutex ) == 0;
}
bool unlock()
{
return pthread_mutex_unlock( &m_mutex ) == 0;
}
private:
pthread_mutex_t m_mutex;
};
class cond
{
public:
cond()
{
if( pthread_mutex_init( &m_mutex, NULL ) != 0 )
{
throw std::exception();
}
if ( pthread_cond_init( &m_cond, NULL ) != 0 )
{
pthread_mutex_destroy( &m_mutex );
throw std::exception();
}
}
~cond()
{
pthread_mutex_destroy( &m_mutex );
pthread_cond_destroy( &m_cond );
}
bool wait()
{
int ret = 0;
pthread_mutex_lock( &m_mutex );
ret = pthread_cond_wait( &m_cond, &m_mutex );
pthread_mutex_unlock( &m_mutex );
return ret == 0;
}
bool signal()
{
return pthread_cond_signal( &m_cond ) == 0;
}
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
压力测试程序:通常有IO复用、多线程、多进程实现压力测试,其中IO复用施压程度最高其它的要切换CPU
本程序是客户端通过命令行参数指定num个客户连接,并向服务端发送HTTP请求,服务端HTTP应答,客户端输出是请求和应答数据交替出现
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
//向服务器发送HTTP请求内容
static const char* request = "GET http://localhost/index.html HTTP/1.1\r\nConnection: keep-alive\r\n\r\nxxxxxxxxxxxx";
int setnonblocking( int fd )//设置非阻塞描述符
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
void addfd( int epoll_fd, int fd )//添加描述符到事件表
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLOUT | EPOLLET | EPOLLERR;//可写事件
epoll_ctl( epoll_fd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
bool write_nbytes( int sockfd, const char* buffer, int len )//向服务器写函数即发送HTTP请求
{
int bytes_write = 0;
printf( "write out %d bytes to socket %d\n", len, sockfd );
while( 1 ) //循环写直至写完一次buffer也就是HTTP requst
{
bytes_write = send( sockfd, buffer, len, 0 );
if ( bytes_write == -1 )
{
return false;
}
else if ( bytes_write == 0 )
{
return false;
}
len -= bytes_write;
buffer = buffer + bytes_write;
if ( len <= 0 )
{
return true;
}
}
}
bool read_once( int sockfd, char* buffer, int len )//读一次,接收服务器发送来的HTTP应答
{
int bytes_read = 0;
memset( buffer, '\0', len );
bytes_read = recv( sockfd, buffer, len, 0 );
if ( bytes_read == -1 )
{
return false;
}
else if ( bytes_read == 0 )
{
return false;
}
printf( "read in %d bytes from socket %d with content: %s\n", bytes_read, sockfd, buffer );
return true;
}
void start_conn( int epoll_fd, int num, const char* ip, int port )//发起num个连接
{
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
for ( int i = 0; i < num; ++i )
{
sleep( 1 );
int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
printf( "create 1 sock\n" );
if( sockfd < 0 )
{
continue;
}
if ( connect( sockfd, ( struct sockaddr* )&address, sizeof( address ) ) == 0 )
{
printf( "build connection %d\n", i );
addfd( epoll_fd, sockfd );//初始注册为可写事件
}
}
}
void close_conn( int epoll_fd, int sockfd )//关闭连接
{
epoll_ctl( epoll_fd, EPOLL_CTL_DEL, sockfd, 0 );
close( sockfd );
}
int main( int argc, char* argv[] )
{
assert( argc == 4 );
int epoll_fd = epoll_create( 100 );
start_conn( epoll_fd, atoi( argv[ 3 ] ), argv[1], atoi( argv[2] ) );
epoll_event events[ 10000 ];
char buffer[ 2048 ];
while ( 1 )
{
int fds = epoll_wait( epoll_fd, events, 10000, 2000 );//2000ms内等待最多10000个事件
for ( int i = 0; i < fds; i++ )
{
int sockfd = events[i].data.fd;
if ( events[i].events & EPOLLIN )//HTTP连接上可读事件即服务端发送给客户端HTTP回答报文
{
if ( ! read_once( sockfd, buffer, 2048 ) )//读取HTTP应答
{
close_conn( epoll_fd, sockfd );
}
struct epoll_event event;
event.events = EPOLLOUT | EPOLLET | EPOLLERR;//更改为可写事件
event.data.fd = sockfd;
epoll_ctl( epoll_fd, EPOLL_CTL_MOD, sockfd, &event );//
}
else if( events[i].events & EPOLLOUT ) //可写事件初始就是可写
{
if ( ! write_nbytes( sockfd, request, strlen( request ) ) )//向服务端发送HTTP请求
{
close_conn( epoll_fd, sockfd );
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLET | EPOLLERR;//更改为可写事件
event.data.fd = sockfd;
epoll_ctl( epoll_fd, EPOLL_CTL_MOD, sockfd, &event );//这样做的目的是客户端发送HTTP请求和服务端HTTP回答交替出现
}
else if( events[i].events & EPOLLERR )
{
close_conn( epoll_fd, sockfd );
}
}
}
}