简介:
半同步/半反应堆线程池是通过一个线程往工作队列添加任务T,然后工作线程竞争工作队列获得任务T。HTTP请求解析服务端程序:逐行解析客户端发送来的HTTP请求然后作出HTTP回答。采用线程池就是:服务端创建一个线程池,然后有HTTP请求到达就将HTTP请求的处理添加到线程池任务队列中去,线程池工作线程通过竞态机制(信号量)竞争任务T(HTTP请求处理)。
HTTP请求内容:
GET http://www.baidu.com/index.html HTTP/1.1 //该行是HTTP请求行,GET是请求方法表示以只读方式获取资源 http那一串是url HTTP那个是客户端HTTP版本
User-Agent:Wget/1.12 (linux-gnu) //客户端使用的程序
Host: www.baidu.com //目的主机名,必须有
Connection: close //用于告诉服务器处理完这个请求后关闭连接,可选其它
注意:第一行为请求行,后面3行是HTTP头部,HTTP请求每行均以回车符和换行符结束,所有头部字段结束后必须包含一个空行,该空行只能有一个 回车和换行符。HTTP回答类似。可见HTTP解析分为请求行和头部,每行按照\r\t提取
半同步/半异步线程池:threadpool.h
- #ifndef THREADPOOL_H
- #define THREADPOOL_H
- #include <list>
- #include <cstdio>
- #include <exception>
- #include <pthread.h>
- #include "locker.h"//简单封装了互斥量和信号量的接口
- template< typename T >
- class threadpool//线程池类模板参数T是任务类型,T中必须有接口process
- {
- public:
- threadpool( int thread_number = 8, int max_requests = 10000 );//线程数目和最大连接处理数
- ~threadpool();
- bool append( T* request );
- private:
- static void* worker( void* arg );//线程工作函数
- void run();//启动线程池
- private:
- int m_thread_number;//线程数量
- int m_max_requests;//最大连接数目
- pthread_t* m_threads;//线程id
- std::list< T* > m_workqueue;//工作队列:各线程竞争该队列并处理相应的任务逻辑T
- locker m_queuelocker;//工作队列互斥量
- sem m_queuestat;//信号量:用于工作队列
- bool m_stop;//终止标志
- };
- template< typename T >
- threadpool< T >::threadpool( int thread_number, int max_requests ) :
- m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL )
- {
- if( ( thread_number <= 0 ) || ( max_requests <= 0 ) )
- {
- throw std::exception();
- }
- m_threads = new pthread_t[ m_thread_number ];//工作线程数组
- if( ! m_threads )
- {
- throw std::exception();
- }
- for ( int i = 0; i < thread_number; ++i )//创建工作线程
- {
- printf( "create the %dth thread\n", i );
- if( pthread_create( m_threads + i, NULL, worker, this ) != 0 )//注意C++调用pthread_create函数的第三个参数必须是一个静态函数,一个静态成员使用动态成员的方式:通过类静态对象、将类对象作为参数传给静态函数。这里使用了后者所以有this
- {
- delete [] m_threads;
- throw std::exception();
- }
- if( pthread_detach( m_threads[i] ) )//线程分离后其它线程无法pthread_join等待
- {
- delete [] m_threads;
- throw std::exception();
- }
- }
- }
- template< typename T >
- threadpool< T >::~threadpool()
- {
- delete [] m_threads;
- m_stop = true;
- }
- template< typename T >
- bool threadpool< T >::append( T* request )//向工作队列添加任务T
- {
- m_queuelocker.lock();//非原子操作需要互斥量保护
- if ( m_workqueue.size() > m_max_requests )//任务队列满了
- {
- m_queuelocker.unlock();
- return false;
- }
- m_workqueue.push_back( request );
- m_queuelocker.unlock();
- m_queuestat.post();//信号量的V操作,即信号量+1多了个工作任务T
- return true;
- }
- template< typename T >
- void* threadpool< T >::worker( void* arg )//工作线程函数
- {
- threadpool* pool = ( threadpool* )arg;//获取进程池对象
- pool->run();//调用线程池run函数
- return pool;
- }
- template< typename T >
- void threadpool< T >::run()//工作线程真正工作逻辑:从任务队列领取任务T并执行任务T
- {
- while ( ! m_stop )
- {
- m_queuestat.wait();//信号量P操作,申请信号量获取任务T
- m_queuelocker.lock();//互斥量保护任务队列,和前面的信号量顺序不能呼唤。。。你懂的
- if ( m_workqueue.empty() )
- {
- m_queuelocker.unlock();//任务队列空那就没任务呗
- continue;
- }
- T* request = m_workqueue.front();//获取任务T
- m_workqueue.pop_front();
- m_queuelocker.unlock();
- if ( ! request )
- {
- continue;
- }
- request->process();//执行任务T的相应逻辑,任务T中必须有process接口
- }
- }
- #endif
HTTP请求任务T:http_conn.h接收到一个HTTP请求后解析该请求,然后作出应答
- #ifndef HTTPCONNECTION_H
- #define HTTPCONNECTION_H
- #include <unistd.h>
- #include <signal.h>
- #include <sys/types.h>
- #include <sys/epoll.h>
- #include <fcntl.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <assert.h>
- #include <sys/stat.h>
- #include <string.h>
- #include <pthread.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/mman.h>
- #include <stdarg.h>
- #include <errno.h>
- #include "locker.h"//互斥量和信号量的简单封装
- class http_conn//HTTP连接任务类型,用于线程池工作队列的任务类型T
- {
- public:
- static const int FILENAME_LEN = 200;//文件名最大长度,文件是HTTP请求的资源页文件
- static const int READ_BUFFER_SIZE = 2048;//读缓冲区,用于读取HTTP请求
- static const int WRITE_BUFFER_SIZE = 1024;//写缓冲区,用于HTTP回答
- enum METHOD { GET = 0, POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT, PATCH };//HTTP请求方法,本程序只定义了GET逻辑
- enum CHECK_STATE { CHECK_STATE_REQUESTLINE = 0, CHECK_STATE_HEADER, CHECK_STATE_CONTENT };//HTTP请求状态:正在解析请求行、正在解析头部、解析中
- enum HTTP_CODE { NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION };//HTTP请求结果:未完整的请求(客户端仍需要提交请求)、完整的请求、错误请求...只用了前三个
- enum LINE_STATUS { LINE_OK = 0, LINE_BAD, LINE_OPEN };//HTTP每行解析状态:改行解析完毕、错误的行、正在解析行
- public:
- http_conn(){}
- ~http_conn(){}
- public:
- void init( int sockfd, const sockaddr_in& addr );//初始化新的HTTP连接
- void close_conn( bool real_close = true );
- void process();//处理客户请求
- bool read();//读取客户发送来的数据(HTTP请求)
- bool write();//将请求结果返回给客户端
- private:
- void init();//重载init初始化连接,用于内部调用
- HTTP_CODE process_read();//解析HTTP请求,内部调用parse_系列函数
- bool process_write( HTTP_CODE ret );//填充HTTP应答,通常是将客户请求的资源页发送给客户,内部调用add_系列函数
- HTTP_CODE parse_request_line( char* text );//解析HTTP请求的请求行
- HTTP_CODE parse_headers( char* text );//解析HTTP头部数据
- HTTP_CODE parse_content( char* text );//获取解析结果
- HTTP_CODE do_request();//处理HTTP连接:内部调用process_read(),process_write()
- char* get_line() { return m_read_buf + m_start_line; }//获取HTTP请求数据中的一行数据
- LINE_STATUS parse_line();//解析行内部调用parse_request_line和parse_headers
- //下面的函数被process_write填充HTTP应答
- void unmap();//解除内存映射,这里内存映射是指将客户请求的资源页文件映射通过mmap映射到内存
- bool add_response( const char* format, ... );
- bool add_content( const char* content );
- bool add_status_line( int status, const char* title );
- bool add_headers( int content_length );
- bool add_content_length( int content_length );
- bool add_linger();
- bool add_blank_line();
- public:
- static int m_epollfd;//所有socket上的事件都注册到一个epoll事件表中所以用static
- static int m_user_count;//用户数量
- private:
- int m_sockfd;//HTTP连接对应的客户在服务端的描述符m_sockfd和地址m_address
- sockaddr_in m_address;
- char m_read_buf[ READ_BUFFER_SIZE ];//读缓冲区,读取HTTP请求
- int m_read_idx;//已读入的客户数据最后一个字节的下一个位置,即未读数据的第一个位置
- int m_checked_idx;//当前已经解析的字节(HTTP请求需要逐个解析)
- int m_start_line;//当前解析行的起始位置
- char m_write_buf[ WRITE_BUFFER_SIZE ];//写缓冲区
- int m_write_idx;//写缓冲区待发送的数据
- CHECK_STATE m_check_state;//HTTP解析的状态:请求行解析、头部解析
- METHOD m_method;//HTTP请求方法,只实现了GET
- char m_real_file[ FILENAME_LEN ];//HTTP请求的资源页对应的文件名称,和服务端的路径拼接就形成了资源页的路径
- char* m_url;//请求的具体资源页名称,如:www.baidu.com/index.html
- char* m_version;//HTTP协议版本号,一般是:HTTP/1.1
- char* m_host;//主机名,客户端要在HTTP请求中的目的主机名
- int m_content_length;//HTTP消息体的长度,简单的GET请求这个为空
- bool m_linger;//HTTP请求是否保持连接
- char* m_file_address;//资源页文件内存映射后的地址
- struct stat m_file_stat;//资源页文件的状态,stat文件结构体
- struct iovec m_iv[2];//调用writev集中写函数需要m_iv_count表示被写内存块的数量,iovec结构体存放了一段内存的起始位置和长度,
- int m_iv_count;//m_iv_count是指iovec结构体数组的长度即多少个内存块
- };
- #endif
任务T:http_conn.cpp
- #include "http_conn.h"
- //定义了HTTP请求的返回状态信息,类似大家都熟悉的404
- const char* ok_200_title = "OK";
- const char* error_400_title = "Bad Request";
- const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
- const char* error_403_title = "Forbidden";
- const char* error_403_form = "You do not have permission to get file from this server.\n";
- const char* error_404_title = "Not Found";
- const char* error_404_form = "The requested file was not found on this server.\n";
- const char* error_500_title = "Internal Error";
- const char* error_500_form = "There was an unusual problem serving the requested file.\n";
- const char* doc_root = "/var/www/html";//服务端资源页的路径,将其和HTTP请求中解析的m_url拼接形成资源页的位置
- int setnonblocking( int fd )//将fd设置为非阻塞
- {
- int old_option = fcntl( fd, F_GETFL );
- int new_option = old_option | O_NONBLOCK;
- fcntl( fd, F_SETFL, new_option );
- return old_option;
- }
- void addfd( int epollfd, int fd, bool one_shot )//将fd添加到事件表epollfd
- {
- epoll_event event;
- event.data.fd = fd;
- event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
- if( one_shot )//采用EPOLLONESHOT事件避免了同一事件被多次触发,因为一个事件只被触发一次且需要重置事件才能侦听下次是否发生
- {
- event.events |= EPOLLONESHOT;
- }
- epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
- setnonblocking( fd );
- }
- void removefd( int epollfd, int fd )//将fd从事件表epollfd中移除
- {
- epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 );
- close( fd );
- }
- void modfd( int epollfd, int fd, int ev )//EPOLLONESHOT需要重置事件后事件才能进行下次侦听
- {
- epoll_event event;
- event.data.fd = fd;
- event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
- epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );//注意是EPOLL_CTL_MOD修改
- }
- int http_conn::m_user_count = 0;//连接数
- int http_conn::m_epollfd = -1;//事件表,注意是static故所有http_con类对象共享一个事件表
- void http_conn::close_conn( bool real_close )//关闭连接,从事件表中移除描述符
- {
- if( real_close && ( m_sockfd != -1 ) )//m_sockfd是该HTTP连接对应的描述符
- {
- //modfd( m_epollfd, m_sockfd, EPOLLIN );
- removefd( m_epollfd, m_sockfd );
- m_sockfd = -1;
- m_user_count--;
- }
- }
- void http_conn::init( int sockfd, const sockaddr_in& addr )//初始化连接
- {
- m_sockfd = sockfd;//sockfd是http连接对应的描述符用于接收http请求和http回答
- m_address = addr;//客户端地址
- int error = 0;
- socklen_t len = sizeof( error );
- getsockopt( m_sockfd, SOL_SOCKET, SO_ERROR, &error, &len );
- int reuse = 1;
- setsockopt( m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );//获取描述符状态,可以在调试时用
- addfd( m_epollfd, sockfd, true );
- m_user_count++;//多了一个http用户
- init();//调用重载函数
- }
- void http_conn::init()//重载init函数进行些连接前的初始化操作
- {
- m_check_state = CHECK_STATE_REQUESTLINE;
- m_linger = false;
- m_method = GET;
- m_url = 0;
- m_version = 0;
- m_content_length = 0;
- m_host = 0;
- m_start_line = 0;
- m_checked_idx = 0;
- m_read_idx = 0;
- m_write_idx = 0;
- memset( m_read_buf, '\0', READ_BUFFER_SIZE );
- memset( m_write_buf, '\0', WRITE_BUFFER_SIZE );
- memset( m_real_file, '\0', FILENAME_LEN );
- }
- http_conn::LINE_STATUS http_conn::parse_line()//解析HTTP数据:将HTTP数据的每行数据提取出来,每行以回车\r和换行符\n结束
- {
- char temp;
- for ( ; m_checked_idx < m_read_idx; ++m_checked_idx )//m_checked_idx是当前正在解析的字节,m_read_idx是读缓冲区中已有的数据(客户端发送了多少HTTP请求数据到来),解析到m_read_idx号字节
- {
- temp = m_read_buf[ m_checked_idx ];//当前解析字节
- if ( temp == '\r' )//若为回车符:
- {
- if ( ( m_checked_idx + 1 ) == m_read_idx )//若为回车符:若此回车符是已读取数据的最后一个则仍需要解析改行(即该行数据还没有接收完整)
- {
- return LINE_OPEN;
- }
- else if ( m_read_buf[ m_checked_idx + 1 ] == '\n' )//若回车符的下一个是换行符\r则表明该行解析完毕(回车+换行是HTTP请求每行固定结束规则)
- {
- m_read_buf[ m_checked_idx++ ] = '\0';//将该行数据送给缓冲区
- m_read_buf[ m_checked_idx++ ] = '\0';
- return LINE_OK;//返回状态:该行解析成功
- }
- return LINE_BAD;//否则解析失败
- }
- else if( temp == '\n' )//解析的字符是换行符则前一个必须是回车才解析成功
- {
- if( ( m_checked_idx > 1 ) && ( m_read_buf[ m_checked_idx - 1 ] == '\r' ) )
- {
- m_read_buf[ m_checked_idx-1 ] = '\0';
- m_read_buf[ m_checked_idx++ ] = '\0';
- return LINE_OK;
- }
- return LINE_BAD;
- }
- }
- return LINE_OPEN;//正在解析,还有HTTP请求数据没有接收到....
- }
- bool http_conn::read()//读取HTTP请求数据
- {
- if( m_read_idx >= READ_BUFFER_SIZE )//读缓冲区已满
- {
- return false;
- }
- int bytes_read = 0;//记录接收的字节数
- while( true )//循环读取的原因是EPOLLONESHOT一个事件只触发一次所以需要一次性读取完全否则数据丢失
- {
- bytes_read = recv( m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0 );//接收客户端的HTTP请求
- if ( bytes_read == -1 )
- {
- if( errno == EAGAIN || errno == EWOULDBLOCK )//非阻塞描述符这两个errno不是网络出错而是设备当前不可得,在这里就是一次事件的数据读取完毕
- {
- break;
- }
- return false;//否则recv出错
- }
- else if ( bytes_read == 0 )//客户端关闭了连接
- {
- return false;
- }
- m_read_idx += bytes_read;//更新读缓冲区的已读大小(用于解析行函数)
- }
- return true;
- }
- http_conn::HTTP_CODE http_conn::parse_request_line( char* text )//解析HTTP的请求行部分
- {
- m_url = strpbrk( text, " \t" );//在text搜索\t的位置
- if ( ! m_url )
- {
- return BAD_REQUEST;
- }
- *m_url++ = '\0';
- char* method = text;
- if ( strcasecmp( method, "GET" ) == 0 )//忽略大小写比较mehtod和GET的大小返回值和strcmp定义相同
- {
- m_method = GET;
- }
- else
- {
- return BAD_REQUEST;
- }
- m_url += strspn( m_url, " \t" );//strspn函数是在m_url找到第一个\t位置,拼接资源页文件路径
- m_version = strpbrk( m_url, " \t" );
- if ( ! m_version )
- {
- return BAD_REQUEST;
- }
- *m_version++ = '\0';
- m_version += strspn( m_version, " \t" );
- if ( strcasecmp( m_version, "HTTP/1.1" ) != 0 )//HTTP版本
- {
- return BAD_REQUEST;
- }
- if ( strncasecmp( m_url, "http://", 7 ) == 0 )
- {
- m_url += 7;
- m_url = strchr( m_url, '/' );
- }
- if ( ! m_url || m_url[ 0 ] != '/' )
- {
- return BAD_REQUEST;
- }
- m_check_state = CHECK_STATE_HEADER;//将HTTP解析状态更新为解析头部,那么HTTP解析进入解析HTTP头部。这是有限状态机
- return NO_REQUEST;
- }
- http_conn::HTTP_CODE http_conn::parse_headers( char* text )//解析HTTP头部
- {
- if( text[ 0 ] == '\0' )
- {
- if ( m_method == HEAD )
- {
- return GET_REQUEST;//已经获取了一个完整的HTTP请求
- }
- if ( m_content_length != 0 )//若HTTP请求消息长度不为空
- {
- m_check_state = CHECK_STATE_CONTENT;//则解析头部后还要解析消息体,所以HTTP解析状态仍为正在解析中...GET请求不会出现这个...
- return NO_REQUEST;
- }
- return GET_REQUEST;
- }
- else if ( strncasecmp( text, "Connection:", 11 ) == 0 )
- {
- text += 11;
- text += strspn( text, " \t" );
- if ( strcasecmp( text, "keep-alive" ) == 0 )
- {
- m_linger = true;
- }
- }
- else if ( strncasecmp( text, "Content-Length:", 15 ) == 0 )
- {
- text += 15;
- text += strspn( text, " \t" );
- m_content_length = atol( text );
- }
- else if ( strncasecmp( text, "Host:", 5 ) == 0 )
- {
- text += 5;
- text += strspn( text, " \t" );
- m_host = text;
- }
- else
- {
- printf( "oop! unknow header %s\n", text );
- }
- return NO_REQUEST;
- }
- http_conn::HTTP_CODE http_conn::parse_content( char* text )//解析结果
- {
- if ( m_read_idx >= ( m_content_length + m_checked_idx ) )//若解析到缓冲区的最后位置则获得一个一个完整的连接请求
- {
- text[ m_content_length ] = '\0';
- return GET_REQUEST;
- }
- return NO_REQUEST;//请求不完整
- }
- http_conn::HTTP_CODE http_conn::process_read()//完整的HTTP解析
- {
- LINE_STATUS line_status = LINE_OK;
- HTTP_CODE ret = NO_REQUEST;
- char* text = 0;
- while ( ( ( m_check_state == CHECK_STATE_CONTENT ) && ( line_status == LINE_OK ) )
- || ( ( line_status = parse_line() ) == LINE_OK ) ){//满足条件:正在进行HTTP解析、读取一个完整行
- text = get_line();//从读缓冲区(HTTP请求数据)获取一行数据
- m_start_line = m_checked_idx;//行的起始位置等于正在每行解析的第一个字节
- printf( "got 1 http line: %s\n", text );
- switch ( m_check_state )//HTTP解析状态跳转
- {
- case CHECK_STATE_REQUESTLINE://正在分析请求行
- {
- ret = parse_request_line( text );//分析请求行
- if ( ret == BAD_REQUEST )
- {
- return BAD_REQUEST;
- }
- break;
- }
- case CHECK_STATE_HEADER://正在分析请求头部
- {
- ret = parse_headers( text );//分析头部
- if ( ret == BAD_REQUEST )
- {
- return BAD_REQUEST;
- }
- else if ( ret == GET_REQUEST )
- {
- return do_request();//当获得一个完整的连接请求则调用do_request分析处理资源页文件
- }
- break;
- }
- case CHECK_STATE_CONTENT://HTTP解析状态仍为正在解析...没有办法只好继续解析呗....解析消息体
- {
- ret = parse_content( text );
- if ( ret == GET_REQUEST )
- {
- return do_request();
- }
- line_status = LINE_OPEN;
- break;
- }
- default:
- {
- return INTERNAL_ERROR;//内部错误
- }
- }
- }
- return NO_REQUEST;
- }
- http_conn::HTTP_CODE http_conn::do_request()//用于获取资源页文件的状态
- {
- strcpy( m_real_file, doc_root );
- int len = strlen( doc_root );
- strncpy( m_real_file + len, m_url, FILENAME_LEN - len - 1 );
- if ( stat( m_real_file, &m_file_stat ) < 0 )
- {
- return NO_RESOURCE;//若资源页不存在则HTTP解析结果为:没有资源...万恶的404
- }
- if ( ! ( m_file_stat.st_mode & S_IROTH ) )
- {
- return FORBIDDEN_REQUEST;//资源没有权限获取
- }
- if ( S_ISDIR( m_file_stat.st_mode ) )
- {
- return BAD_REQUEST;//请求有错
- }
- int fd = open( m_real_file, O_RDONLY );
- m_file_address = ( char* )mmap( 0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0 );//将资源页文件映射到内存
- close( fd );
- return FILE_REQUEST;//资源页请求成功
- }
- void http_conn::unmap()//解除资源页文件映射的内存
- {
- if( m_file_address )
- {
- munmap( m_file_address, m_file_stat.st_size );//解除映射
- m_file_address = 0;
- }
- }
- bool http_conn::write()//将资源页文件发送给客户端
- {
- int temp = 0;
- int bytes_have_send = 0;
- int bytes_to_send = m_write_idx;
- if ( bytes_to_send == 0 )
- {
- modfd( m_epollfd, m_sockfd, EPOLLIN );//EPOLLONESHOT事件每次需要重置事件
- init();
- return true;
- }
- while( 1 )//
- {
- temp = writev( m_sockfd, m_iv, m_iv_count );//集中写,m_sockfd是http连接对应的描述符,m_iv是iovec结构体数组表示内存块地址,m_iv_count是数组的长度即多少个内存块将一次集中写到m_sockfd
- if ( temp <= -1 )//集中写失败
- {
- if( errno == EAGAIN )
- {
- modfd( m_epollfd, m_sockfd, EPOLLOUT );//重置EPOLLONESHOT事件,注册可写事件表示若m_sockfd没有写失败则关闭连接
- return true;
- }
- unmap();//解除内存映射
- return false;
- }
- bytes_to_send -= temp;//待发送数据
- bytes_have_send += temp;//已发送数据
- if ( bytes_to_send <= bytes_have_send )
- {
- unmap();//该资源页已经发送完毕该解除映射
- if( m_linger )//若要保持该http连接
- {
- init();//初始化http连接
- modfd( m_epollfd, m_sockfd, EPOLLIN );
- return true;
- }
- else
- {
- modfd( m_epollfd, m_sockfd, EPOLLIN );
- return false;
- }
- }
- }
- }
- bool http_conn::add_response( const char* format, ... )//HTTP应答主要是将应答数据添加到写缓冲区m_write_buf
- {
- if( m_write_idx >= WRITE_BUFFER_SIZE )
- {
- return false;
- }
- va_list arg_list;
- va_start( arg_list, format );
- int len = vsnprintf( m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list );//将fromat内容输出到m_write_buf
- if( len >= ( WRITE_BUFFER_SIZE - 1 - m_write_idx ) )
- {
- return false;
- }
- m_write_idx += len;
- va_end( arg_list );
- return true;
- }
- bool http_conn::add_status_line( int status, const char* title )
- {
- return add_response( "%s %d %s\r\n", "HTTP/1.1", status, title );//
- }
- bool http_conn::add_headers( int content_len )
- {
- add_content_length( content_len );
- add_linger();
- add_blank_line();//加空行:回车+换行
- }
- bool http_conn::add_content_length( int content_len )
- {
- return add_response( "Content-Length: %d\r\n", content_len );//
- }
- bool http_conn::add_linger()
- {
- return add_response( "Connection: %s\r\n", ( m_linger == true ) ? "keep-alive" : "close" );//
- }
- bool http_conn::add_blank_line()
- {
- return add_response( "%s", "\r\n" );//
- }
- bool http_conn::add_content( const char* content )
- {
- return add_response( "%s", content );
- }
- bool http_conn::process_write( HTTP_CODE ret )//填充HTTP应答
- {
- switch ( ret )
- {
- case INTERNAL_ERROR:
- {
- add_status_line( 500, error_500_title );
- add_headers( strlen( error_500_form ) );
- if ( ! add_content( error_500_form ) )
- {
- return false;
- }
- break;
- }
- case BAD_REQUEST:
- {
- add_status_line( 400, error_400_title );
- add_headers( strlen( error_400_form ) );
- if ( ! add_content( error_400_form ) )
- {
- return false;
- }
- break;
- }
- case NO_RESOURCE:
- {
- add_status_line( 404, error_404_title );
- add_headers( strlen( error_404_form ) );
- if ( ! add_content( error_404_form ) )
- {
- return false;
- }
- break;
- }
- case FORBIDDEN_REQUEST:
- {
- add_status_line( 403, error_403_title );
- add_headers( strlen( error_403_form ) );
- if ( ! add_content( error_403_form ) )
- {
- return false;
- }
- break;
- }
- case FILE_REQUEST://资源页文件可用
- {
- add_status_line( 200, ok_200_title );
- if ( m_file_stat.st_size != 0 )
- {
- add_headers( m_file_stat.st_size );//m_file_stat资源页文件状态
- m_iv[ 0 ].iov_base = m_write_buf;//写缓冲区
- m_iv[ 0 ].iov_len = m_write_idx;//长度
- m_iv[ 1 ].iov_base = m_file_address;//资源页数据内存映射后在m_file_address地址
- m_iv[ 1 ].iov_len = m_file_stat.st_size;//文件长度就是该块内存长度
- m_iv_count = 2;
- return true;
- }
- else
- {
- const char* ok_string = "<html><body></body></html>";//请求页位空白
- add_headers( strlen( ok_string ) );
- if ( ! add_content( ok_string ) )
- {
- return false;
- }
- }
- }
- default:
- {
- return false;
- }
- }
- m_iv[ 0 ].iov_base = m_write_buf;
- m_iv[ 0 ].iov_len = m_write_idx;
- m_iv_count = 1;
- return true;
- }
- void http_conn::process()//处理HTTP请求
- {
- HTTP_CODE read_ret = process_read();//读取HTTP请求数据
- if ( read_ret == NO_REQUEST )
- {
- modfd( m_epollfd, m_sockfd, EPOLLIN );
- return;
- }
- bool write_ret = process_write( read_ret );//发送资源页给客户端
- if ( ! write_ret )
- {
- close_conn();
- }
- modfd( m_epollfd, m_sockfd, EPOLLOUT );
- }
服务端程序:接收HTTP请求并交给线程池处理这些请求
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <errno.h>
- #include <string.h>
- #include <fcntl.h>
- #include <stdlib.h>
- #include <cassert>
- #include <sys/epoll.h>
- #include "locker.h"//该头文件封装了信号量和互斥量
- #include "threadpool.h"//半同步/半反应堆线程池
- #include "http_conn.h"//HTTP连接任务类T
- #define MAX_FD 65536//最大文件数目
- #define MAX_EVENT_NUMBER 10000//最大事件数目
- extern int addfd( int epollfd, int fd, bool one_shot );//采用http_conn.h的addfd函数
- extern int removefd( int epollfd, int fd );//这也是http_conn.h中的函数
- void addsig( int sig, void( handler )(int), bool restart = true )//安装信号,用于统一事件源(将信号和IO事件统一监听)
- {
- struct sigaction sa;
- memset( &sa, '\0', sizeof( sa ) );
- sa.sa_handler = handler;
- if( restart )
- {
- sa.sa_flags |= SA_RESTART;
- }
- sigfillset( &sa.sa_mask );
- assert( sigaction( sig, &sa, NULL ) != -1 );
- }
- void show_error( int connfd, const char* info )
- {
- printf( "%s", info );
- send( connfd, info, strlen( info ), 0 );
- close( connfd );
- }
- int main( int argc, char* argv[] )
- {
- if( argc <= 2 )
- {
- printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
- return 1;
- }
- const char* ip = argv[1];
- int port = atoi( argv[2] );
- addsig( SIGPIPE, SIG_IGN );
- threadpool< http_conn >* pool = NULL;
- try
- {
- pool = new threadpool< http_conn >;//创建线程池
- }
- catch( ... )
- {
- return 1;
- }
- http_conn* users = new http_conn[ MAX_FD ];//创建超大的用户HTTP连接任务数组,给定一个http连接的描述符作为下标即可索引到这个任务,空间换时间
- assert( users );
- int user_count = 0;
- int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
- assert( listenfd >= 0 );
- struct linger tmp = { 1, 0 };
- setsockopt( listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof( tmp ) );
- int ret = 0;
- struct sockaddr_in address;
- bzero( &address, sizeof( address ) );
- address.sin_family = AF_INET;
- inet_pton( AF_INET, ip, &address.sin_addr );
- address.sin_port = htons( port );
- ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
- assert( ret >= 0 );
- ret = listen( listenfd, 5 );
- assert( ret >= 0 );
- epoll_event events[ MAX_EVENT_NUMBER ];
- int epollfd = epoll_create( 5 );//创建事件表
- assert( epollfd != -1 );
- addfd( epollfd, listenfd, false );//将监听端口添加到事件表,false表示不注册EPOLLONESHOT事件,注意不能将监听端口注册为EPOLLONESHOT事件因为该事件每次发生只触发一次,而accept每次只能连接一个客户,那么多个客户连接请求到来,则必然丢失客户连接请求
- http_conn::m_epollfd = epollfd;
- while( true )
- {
- int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );//无限期等待sockfd上的注册事件
- if ( ( number < 0 ) && ( errno != EINTR ) )//若epoll_wait不是因中断EINTR是出错
- {
- printf( "epoll failure\n" );
- break;
- }
- for ( int i = 0; i < number; i++ )
- {
- int sockfd = events[i].data.fd;//获取就绪事件描述符
- if( sockfd == listenfd )//监听端口有可读事件则表明有HTTP请求
- {
- struct sockaddr_in client_address;
- socklen_t client_addrlength = sizeof( client_address );
- int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );//建立客户连接
- if ( connfd < 0 )
- {
- printf( "errno is: %d\n", errno );
- continue;
- }
- if( http_conn::m_user_count >= MAX_FD )//HTTP客户数超过MAX_FD
- {
- show_error( connfd, "Internal server busy" );
- continue;
- }
- users[connfd].init( connfd, client_address );//利用connfd快速索引到http_conn任务类
- }
- else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) )
- {
- users[sockfd].close_conn();
- }
- else if( events[i].events & EPOLLIN )//数据可读:
- {
- if( users[sockfd].read() )
- {
- pool->append( users + sockfd );
- }
- else
- {
- users[sockfd].close_conn();
- }
- }
- else if( events[i].events & EPOLLOUT )//数据可写,哪里注册了可写EPOLLOUT事件?http_conn工作任务类中write函数将那个http连接的描述符m_sockfd注册了可写事件
- {
- if( !users[sockfd].write() )//若该http_conn任务对应的http连接写失败了则关闭该http连接
- {
- users[sockfd].close_conn();
- }
- }
- else
- {}
- }
- }
- close( epollfd );
- close( listenfd );//这里要提醒的是listenfd由创建它的函数关闭,谁污染谁治理的原则
- delete [] users;
- delete pool;
- return 0;
- }
互斥量和信号量的简单封装:locker.h
- #ifndef LOCKER_H
- #define LOCKER_H
- #include <exception>
- #include <pthread.h>
- #include <semaphore.h>
- class sem
- {
- public:
- sem()
- {
- if( sem_init( &m_sem, 0, 0 ) != 0 )
- {
- throw std::exception();
- }
- }
- ~sem()
- {
- sem_destroy( &m_sem );
- }
- bool wait()
- {
- return sem_wait( &m_sem ) == 0;
- }
- bool post()
- {
- return sem_post( &m_sem ) == 0;
- }
- private:
- sem_t m_sem;
- };
- class locker
- {
- public:
- locker()
- {
- if( pthread_mutex_init( &m_mutex, NULL ) != 0 )
- {
- throw std::exception();
- }
- }
- ~locker()
- {
- pthread_mutex_destroy( &m_mutex );
- }
- bool lock()
- {
- return pthread_mutex_lock( &m_mutex ) == 0;
- }
- bool unlock()
- {
- return pthread_mutex_unlock( &m_mutex ) == 0;
- }
- private:
- pthread_mutex_t m_mutex;
- };
- class cond
- {
- public:
- cond()
- {
- if( pthread_mutex_init( &m_mutex, NULL ) != 0 )
- {
- throw std::exception();
- }
- if ( pthread_cond_init( &m_cond, NULL ) != 0 )
- {
- pthread_mutex_destroy( &m_mutex );
- throw std::exception();
- }
- }
- ~cond()
- {
- pthread_mutex_destroy( &m_mutex );
- pthread_cond_destroy( &m_cond );
- }
- bool wait()
- {
- int ret = 0;
- pthread_mutex_lock( &m_mutex );
- ret = pthread_cond_wait( &m_cond, &m_mutex );
- pthread_mutex_unlock( &m_mutex );
- return ret == 0;
- }
- bool signal()
- {
- return pthread_cond_signal( &m_cond ) == 0;
- }
- private:
- pthread_mutex_t m_mutex;
- pthread_cond_t m_cond;
- };
- #endif
压力测试程序:通常有IO复用、多线程、多进程实现压力测试,其中IO复用施压程度最高其它的要切换CPU
本程序是客户端通过命令行参数指定num个客户连接,并向服务端发送HTTP请求,服务端HTTP应答,客户端输出是请求和应答数据交替出现
- #include <stdlib.h>
- #include <stdio.h>
- #include <assert.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/epoll.h>
- #include <fcntl.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <string.h>
- //向服务器发送HTTP请求内容
- static const char* request = "GET http://localhost/index.html HTTP/1.1\r\nConnection: keep-alive\r\n\r\nxxxxxxxxxxxx";
- int setnonblocking( int fd )//设置非阻塞描述符
- {
- int old_option = fcntl( fd, F_GETFL );
- int new_option = old_option | O_NONBLOCK;
- fcntl( fd, F_SETFL, new_option );
- return old_option;
- }
- void addfd( int epoll_fd, int fd )//添加描述符到事件表
- {
- epoll_event event;
- event.data.fd = fd;
- event.events = EPOLLOUT | EPOLLET | EPOLLERR;//可写事件
- epoll_ctl( epoll_fd, EPOLL_CTL_ADD, fd, &event );
- setnonblocking( fd );
- }
- bool write_nbytes( int sockfd, const char* buffer, int len )//向服务器写函数即发送HTTP请求
- {
- int bytes_write = 0;
- printf( "write out %d bytes to socket %d\n", len, sockfd );
- while( 1 ) //循环写直至写完一次buffer也就是HTTP requst
- {
- bytes_write = send( sockfd, buffer, len, 0 );
- if ( bytes_write == -1 )
- {
- return false;
- }
- else if ( bytes_write == 0 )
- {
- return false;
- }
- len -= bytes_write;
- buffer = buffer + bytes_write;
- if ( len <= 0 )
- {
- return true;
- }
- }
- }
- bool read_once( int sockfd, char* buffer, int len )//读一次,接收服务器发送来的HTTP应答
- {
- int bytes_read = 0;
- memset( buffer, '\0', len );
- bytes_read = recv( sockfd, buffer, len, 0 );
- if ( bytes_read == -1 )
- {
- return false;
- }
- else if ( bytes_read == 0 )
- {
- return false;
- }
- printf( "read in %d bytes from socket %d with content: %s\n", bytes_read, sockfd, buffer );
- return true;
- }
- void start_conn( int epoll_fd, int num, const char* ip, int port )//发起num个连接
- {
- int ret = 0;
- struct sockaddr_in address;
- bzero( &address, sizeof( address ) );
- address.sin_family = AF_INET;
- inet_pton( AF_INET, ip, &address.sin_addr );
- address.sin_port = htons( port );
- for ( int i = 0; i < num; ++i )
- {
- sleep( 1 );
- int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
- printf( "create 1 sock\n" );
- if( sockfd < 0 )
- {
- continue;
- }
- if ( connect( sockfd, ( struct sockaddr* )&address, sizeof( address ) ) == 0 )
- {
- printf( "build connection %d\n", i );
- addfd( epoll_fd, sockfd );//初始注册为可写事件
- }
- }
- }
- void close_conn( int epoll_fd, int sockfd )//关闭连接
- {
- epoll_ctl( epoll_fd, EPOLL_CTL_DEL, sockfd, 0 );
- close( sockfd );
- }
- int main( int argc, char* argv[] )
- {
- assert( argc == 4 );
- int epoll_fd = epoll_create( 100 );
- start_conn( epoll_fd, atoi( argv[ 3 ] ), argv[1], atoi( argv[2] ) );
- epoll_event events[ 10000 ];
- char buffer[ 2048 ];
- while ( 1 )
- {
- int fds = epoll_wait( epoll_fd, events, 10000, 2000 );//2000ms内等待最多10000个事件
- for ( int i = 0; i < fds; i++ )
- {
- int sockfd = events[i].data.fd;
- if ( events[i].events & EPOLLIN )//HTTP连接上可读事件即服务端发送给客户端HTTP回答报文
- {
- if ( ! read_once( sockfd, buffer, 2048 ) )//读取HTTP应答
- {
- close_conn( epoll_fd, sockfd );
- }
- struct epoll_event event;
- event.events = EPOLLOUT | EPOLLET | EPOLLERR;//更改为可写事件
- event.data.fd = sockfd;
- epoll_ctl( epoll_fd, EPOLL_CTL_MOD, sockfd, &event );//
- }
- else if( events[i].events & EPOLLOUT ) //可写事件初始就是可写
- {
- if ( ! write_nbytes( sockfd, request, strlen( request ) ) )//向服务端发送HTTP请求
- {
- close_conn( epoll_fd, sockfd );
- }
- struct epoll_event event;
- event.events = EPOLLIN | EPOLLET | EPOLLERR;//更改为可写事件
- event.data.fd = sockfd;
- epoll_ctl( epoll_fd, EPOLL_CTL_MOD, sockfd, &event );//这样做的目的是客户端发送HTTP请求和服务端HTTP回答交替出现
- }
- else if( events[i].events & EPOLLERR )
- {
- close_conn( epoll_fd, sockfd );
- }
- }
- }
- }