// server.cpp
#if 0
多个线程对同一个io_service 对象处理
用到第三方库:log4cplus, google::protobuf
用到C++11的特性,Windows 需要用到vs2013 gcc 4.8
#endif
#include <iostream>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
#include <boost/shared_array.hpp>
#include <boost/make_shared.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <common.pb.h>
void async_accept();
void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
const boost::system::error_code &ec);
void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
void handle_head(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_len,
const boost::system::error_code &ec,
std::size_t bytes_transfered);
void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len);
void handle_proto(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_data,
const boost::system::error_code &ec,
std::size_t bytes_transfered);
boost::asio::io_service io_svc;
boost::asio::ip::address_v4 lis_ip; // 默认监听本机所有IP
boost::asio::ip::tcp::endpoint lis_ep(lis_ip, );
boost::asio::ip::tcp::acceptor acceptor(io_svc, lis_ep);
#include <Log.h> // log4cplus 的相关头文件,这里不再一个一个敲出来了。
log4cplus::Logger *gLog = nullptr;
static const int PACKAGE_LENGTH = ;
int main(int argc, char *argv[])
{
log4cplus::initialize();
static log4cplus::Logger s_log = log4cplus::Logger::getInstance("server");
gLog = &s_log;
LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
for (int i = ; i < ; ++i)
{
async_accept();
}
// 捕获信号
boost::asio::signal_set signals_(io_svc);
signals_.add(SIGINT);
signals_.add(SIGTERM);
signals_.async_wait([](const boost::system::error_code &ec, int sig)
{
LOG4CPLUS_INFO_FMT(*gLog, "signal: %d, error_message: %s",
sig, ec.message().c_str());
io_svc.stop();
});
std::vector<std::thread> vecThread;
for (int i = ; i < ; ++i)
{
vecThread.emplace_back(std::thread([](){
LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
io_svc.run();
LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
}));
}
for (size_t i = ; i < vecThread.size(); ++i)
{
vecThread[i].join();
}
assert(io_svc.stopped();
#ifdef WIN32
system("pause");
#endif
return ;
}
// 标记异步监听,投放到指定io_service 对象中
void async_accept()
{
LOG4CPLUS_INFO_FMT(*gLog, "async_accept waitting...");
boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
= boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(ios_svc));
boost::function<void(const boost::system::error_code &> cb_accept;
cb_accept = boost::bind(handle_accept, new_sock, _1);
acceptor.async_accept(*new_sock, cb_accept);
}
// 监听返回的处理
void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
const boost::system::error_code &ec)
{
if (ec != )
{
LOG4CPLUS_INFO(*gLog, "accept failed: " << ec.message());
return;
}
LOG4CPLUS_INFO(*gLog, "a new client connected. " << new_conn->remote_endpoint());
async_read_head(new_conn);
// 处理下一个连接,每次处理完了之后,需要再次accept.
// 否则io_service 将只处理一次,然后结束监听。
// 所以这里可以处理一个情况,就是当你要结束监听的时候,人要在这里return
// 那么io_service 的run() 函数就会stop. 但如果还有其他的异步操作被记录,
// run() 函数还是会继续运行,以处理其他的异步操作。
async_accept();
}
// 对一个指定的连接标记异步读头部,然后投放到io_service 对象
void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
{
// 固定报文头长度为${PACKAGE_LENGTH} 个字节
boost::shared_array<char> sa_len(new char[PACKAGE_LENGTH]);
// 回调函数
boost::function<void(const boost::system::error_code &, std::size_t)> cb_msg_len;
cb_msg_len = boost::bind(handle_head, conn, sa_len, _1, _2);
// 异步读,读一个报文的长度,boost::asio::async_read() 函数有个特点,
// 它会将这里指定的buffer 缓冲区读满了才会回调handle_head 函数。
boost::asio::async_read(
*conn, boost::asio::buffer(sa_len.get(), PACKAGE_LENGTH), cb_msg_len);
}
// 头部数据完整读取后的处理函数
void handle_head(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_len,
const boost::system::error_code &ec,
std::size_t bytes_transfered)
{
if (!conn->is_open())
{
LOG4CPLUS_INFO(*gLog, "socket was not opened.");
return ;
}
if (ec != )
{
if (ec == boost::asio::error::eof)
{
LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
}
else
{
LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
}
return ;
}
// 这里对的数据做处理
assert(bytes_transfered == PACKAGE_LENGTH);
int32_t len_net = ; // 网络字节序:数据部分长度
int32_t len_loc = ; // 本地字节序:数据部分长度
memcpy(&len_net, sa_len.get(), sizeof(len_net));
len_loc = boost::asio::detail::socket_ops::network_to_host_long(len_net);
LOG4CPLUS_INFO_FMT(*gLog, "nLenLoc: %d", len_loc);
async_read_proto(conn, len_loc);
}
// 对一个指定的连接标记异步读数据部,然后投放到io_service 对象
void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len)
{
// 数据部分
boost::shared_array<char> sa_data(new char[len]());
// 回调函数
boost::function<void(const boost::system::error_code &, std::size_t)> cb_proto;
cb_proto = boost::bind(handle_proto, conn, sa_data, _1, _2);
boost::asio::async_read(*conn,
boost::asio::buffer(sa_data.get(), len), cb_proto);
}
// 数据部分读完整后的处理函数
void handle_proto(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_data,
const boost::system::error_code &ec,
std::size_t bytes_transfered)
{
if (!conn->is_open())
{
LOG4CPLUS_INFO(*gLog, "socket was not opened.");
return ;
}
if (ec != )
{
if (ec == boost::asio::error::eof)
{
LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
}
else
{
LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
}
return ;
}
// 处理这个proto 数据
// 这里将这个数组转换成一个proto, 然后处理这个proto
MessageHead pro;
if (!pro.ParseFromArray(sa_data.get(), (int32_t)bytes_transfered))
{
LOG4CPLUS_ERROR_FMT(*gLog, "ParseFromArray() failed");
return ;
}
int port = conn->remote_endpoint().port();
LOG4CPLUS_INFO_FMT(*gLog, "port: %d\n%s", port, pro.DebugString().c_str();
// 处理完了之后,类似accept 的异步调用一样,需要继续调用异步的读数据
// 同样的,如果要结束一个连接,正常的结算应该在这里return 调用。
// 当然了,使用socket 的close(), shut_down() 函数也可以关闭这个连接。
async_read_head(conn);
}