boost asio tcp 多线程异步读写,服务器与客户端。

时间:2021-05-05 15:26:08
 // server.cpp

 #if 0
多个线程对同一个io_service 对象处理
用到第三方库:log4cplus, google::protobuf
用到C++11的特性,Windows 需要用到vs2013 gcc 4.8
#endif #include <iostream>
#include <thread>
#include <vector> #include <boost/asio.hpp> #include <boost/shared_array.hpp>
#include <boost/make_shared.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp> #include <common.pb.h> void async_accept();
void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
const boost::system::error_code &ec);
void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
void handle_head(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_len,
const boost::system::error_code &ec,
std::size_t bytes_transfered);
void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len);
void handle_proto(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_data,
const boost::system::error_code &ec,
std::size_t bytes_transfered); boost::asio::io_service io_svc;
boost::asio::ip::address_v4 lis_ip; // 默认监听本机所有IP
boost::asio::ip::tcp::endpoint lis_ep(lis_ip, );
boost::asio::ip::tcp::acceptor acceptor(io_svc, lis_ep); #include <Log.h> // log4cplus 的相关头文件,这里不再一个一个敲出来了。 log4cplus::Logger *gLog = nullptr; static const int PACKAGE_LENGTH = ; int main(int argc, char *argv[])
{
log4cplus::initialize(); static log4cplus::Logger s_log = log4cplus::Logger::getInstance("server");
gLog = &s_log; LOG4CPLUS_INFO_FMT(*gLog, "main begin..."); for (int i = ; i < ; ++i)
{
async_accept();
} // 捕获信号
boost::asio::signal_set signals_(io_svc);
signals_.add(SIGINT);
signals_.add(SIGTERM);
signals_.async_wait([](const boost::system::error_code &ec, int sig)
{
LOG4CPLUS_INFO_FMT(*gLog, "signal: %d, error_message: %s",
sig, ec.message().c_str());
io_svc.stop();
}); std::vector<std::thread> vecThread;
for (int i = ; i < ; ++i)
{
vecThread.emplace_back(std::thread([](){
LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
io_svc.run();
LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
}));
} for (size_t i = ; i < vecThread.size(); ++i)
{
vecThread[i].join();
}
assert(io_svc.stopped(); #ifdef WIN32
system("pause");
#endif return ;
} // 标记异步监听,投放到指定io_service 对象中
void async_accept()
{
LOG4CPLUS_INFO_FMT(*gLog, "async_accept waitting..."); boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
= boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(ios_svc)); boost::function<void(const boost::system::error_code &> cb_accept;
cb_accept = boost::bind(handle_accept, new_sock, _1);
acceptor.async_accept(*new_sock, cb_accept);
} // 监听返回的处理
void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
const boost::system::error_code &ec)
{
if (ec != )
{
LOG4CPLUS_INFO(*gLog, "accept failed: " << ec.message()); return;
}
LOG4CPLUS_INFO(*gLog, "a new client connected. " << new_conn->remote_endpoint()); async_read_head(new_conn); // 处理下一个连接,每次处理完了之后,需要再次accept.
// 否则io_service 将只处理一次,然后结束监听。
// 所以这里可以处理一个情况,就是当你要结束监听的时候,人要在这里return
// 那么io_service 的run() 函数就会stop. 但如果还有其他的异步操作被记录,
// run() 函数还是会继续运行,以处理其他的异步操作。
async_accept();
} // 对一个指定的连接标记异步读头部,然后投放到io_service 对象
void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
{
// 固定报文头长度为${PACKAGE_LENGTH} 个字节
boost::shared_array<char> sa_len(new char[PACKAGE_LENGTH]); // 回调函数
boost::function<void(const boost::system::error_code &, std::size_t)> cb_msg_len;
cb_msg_len = boost::bind(handle_head, conn, sa_len, _1, _2); // 异步读,读一个报文的长度,boost::asio::async_read() 函数有个特点,
// 它会将这里指定的buffer 缓冲区读满了才会回调handle_head 函数。
boost::asio::async_read(
*conn, boost::asio::buffer(sa_len.get(), PACKAGE_LENGTH), cb_msg_len);
} // 头部数据完整读取后的处理函数
void handle_head(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_len,
const boost::system::error_code &ec,
std::size_t bytes_transfered)
{
if (!conn->is_open())
{
LOG4CPLUS_INFO(*gLog, "socket was not opened.");
return ;
} if (ec != )
{
if (ec == boost::asio::error::eof)
{
LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
}
else
{
LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
} return ;
} // 这里对的数据做处理
assert(bytes_transfered == PACKAGE_LENGTH);
int32_t len_net = ; // 网络字节序:数据部分长度
int32_t len_loc = ; // 本地字节序:数据部分长度
memcpy(&len_net, sa_len.get(), sizeof(len_net));
len_loc = boost::asio::detail::socket_ops::network_to_host_long(len_net);
LOG4CPLUS_INFO_FMT(*gLog, "nLenLoc: %d", len_loc); async_read_proto(conn, len_loc);
} // 对一个指定的连接标记异步读数据部,然后投放到io_service 对象
void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len)
{
// 数据部分
boost::shared_array<char> sa_data(new char[len]()); // 回调函数
boost::function<void(const boost::system::error_code &, std::size_t)> cb_proto;
cb_proto = boost::bind(handle_proto, conn, sa_data, _1, _2); boost::asio::async_read(*conn,
boost::asio::buffer(sa_data.get(), len), cb_proto);
} // 数据部分读完整后的处理函数
void handle_proto(
boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
boost::shared_array<char> sa_data,
const boost::system::error_code &ec,
std::size_t bytes_transfered)
{
if (!conn->is_open())
{
LOG4CPLUS_INFO(*gLog, "socket was not opened.");
return ;
} if (ec != )
{
if (ec == boost::asio::error::eof)
{
LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
}
else
{
LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
}
return ;
} // 处理这个proto 数据
// 这里将这个数组转换成一个proto, 然后处理这个proto
MessageHead pro;
if (!pro.ParseFromArray(sa_data.get(), (int32_t)bytes_transfered))
{
LOG4CPLUS_ERROR_FMT(*gLog, "ParseFromArray() failed");
return ;
} int port = conn->remote_endpoint().port();
LOG4CPLUS_INFO_FMT(*gLog, "port: %d\n%s", port, pro.DebugString().c_str(); // 处理完了之后,类似accept 的异步调用一样,需要继续调用异步的读数据
// 同样的,如果要结束一个连接,正常的结算应该在这里return 调用。
// 当然了,使用socket 的close(), shut_down() 函数也可以关闭这个连接。
async_read_head(conn);
}
 // client.cpp

 #include <iostream>
#include <thread>
#include <vector> #include <boost/asio.hpp> #include <boost/shared_array.hpp>
#include <boost/make_shared.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp> #include <boost/pool/pool.hpp>
#include <boost/pool/singleton_pool.hpp> // proto buffer 生成的头文件
#include <common.pb.h> void async_connect();
void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
const boost::system::error_code &ec);
void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
const std::shared_ptr<std::string> sp_data_proto,
const boost::system::error_code &ec,
std::size_t bytes_transfered);
void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
const std::shared_ptr<std::string> sp_data_proto,
const boost::system::error_code &ec, std::size_t bytes_transfered); boost::asio::io_service io_svc;
boost::asio::ip::tcp::endpoint svr_ep(
boost::asio::ip::address_v4::from_string("127.0.0.1"), ); #include <Log.h> // log4cplus 的相关头文件,这里不再一个一个敲出来了。 log4cplus::Logger *gLog = nullptr; // 应该是直接使用对象,但是懒得改就保留了。 // 包头固定长度
static const int PACKAGE_LENGTH = ; using pool_head = boost::singleton_pool<struct struHead, PACKAGE_LENGTH>;
using pool_string = boost::singleton_pool<struct struString, sizeof(std::string)>; std::shared_ptr<std::string> createSharedString()
{
std::shared_ptr<std::string> spTp(new (pool_string::malloc()) std::string,
[](std::string *tp)
{
tp->~basic_string();
pool_string::free(tp);
}); return spTp;
} int main(int argc, char *argv[])
{
log4cplus::initialize(); static log4cplus::Logger s_log = log4cplus::Logger::getInstance("client");
gLog = &s_log;
assert(gLog != nullptr); LOG4CPLUS_INFO_FMT(*gLog, "main begin..."); for (int i = ; i < ; ++i)
{
async_connect();
} std::vector<std::thread> vecThread;
for (int i = ; i < ; ++i)
{
vecThread.emplace_back(std::thread([]() {
LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
io_svc.run();
LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
}));
} for (size_t i = ; i < vecThread.size(); ++i)
{
vecThread[i].join();
}
assert(io_svc.stopped()); #ifdef WIN32
system("pause");
#endif return ;
} void async_connect()
{
LOG4CPLUS_INFO_FMT(*gLog, "async_connect waitting..."); boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
= boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(io_svc)); new_sock->async_connect(svr_ep, boost::bind(
handle_connect, new_sock,
boost::asio::placeholders::error));
} void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
const boost::system::error_code &ec)
{
if (ec != )
{
LOG4CPLUS_INFO(*gLog, "connect failed: " << ec.message());
return ;
} LOG4CPLUS_INFO(*gLog, "connect success, server: " << new_conn->remote_endpoint()); async_write(new_conn);
} #if 0
message messageHead
{
optional uint32 FunCode = ;
optional uint32 RequestID = ;
optional uint32 AccountId = ;
optional uint32 AccessId = ;
optional int64 ClientTime = ;
optional uint32 GoodsId = ;
optional bytes UUID = ;
}
#endif void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
{
MessageHead pro;
pro.set_funcode();
pro.set_requestid();
pro.set_accountid();
pro.set_clienttime(time(NULL));
pro.set_goodsid();
pro.set_uuid(std::string("uuid_500384")); std::shared_ptr<std::string> sp_data = createSharedString();
if (!pro.SerializeToString(sp_data.get())
{
LOG4CPLUS_ERROR_FMT(*gLOg, "SerializeToString failed."); return ;
} LOG4CPLUS_INFO_FMT(*gLog, "data.size() = %lld", sp_data->size()); char ch_head[PACKAGE_LENGTH] = {};
int32_t len_net = boost::asio::detail::socket_ops::host_to_network_long((int32_t)sp_data->size());
memcpy(ch_head, &len_net, sizeof(len_net)); if (sp_data->size() == )
{
return ;
} boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_head;
cb_write_head = boost::bind(handle_write_head, conn, sp_data, _1, _2);
boost::asio::async_write(*conn, boost::asio::buffer(ch_head, PACKAGE_LENGTH), cb_write_head);
} void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
const std::shared_ptr<std::string> sp_data_proto,
const boost::system::error_code &ec,
std::size_t bytes_transfered)
{
if (!conn->is_open())
{
LOG4CPLUS_INFO(*gLog, "socket was not opened.");
return;
} if (ec != )
{
if (ec == boost::asio::error::eof)
{
LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
}
else
{
LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
} return ;
} boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_proto;
cb_write_proto = boost::bind(handle_write_proto, conn, sp_data_proto, _1, _2);
boost::asio::async_write(*conn, boost::asio::buffer(*sp_data_proto), cb_write_proto);
} // 这里的sp_data_proto 在该函数中并不需要使用,用它作参数的唯一作用,就是保留它的生命周期,
// 保证在数据写完之前它不会被析构。
// 因为,如果该对象在async_write 还未写完之前就被析构的话, 就会造成数据的错乱,最终导致对端的数据是错误的。
void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
const std::shared_ptr<std::string> sp_data_proto,
const boost::system::error_code &ec, std::size_t bytes_transfered)
{
if (!conn->is_open())
{
LOG4CPLUS_INFO(*gLog, "socket was not opened.");
return ;
} if (ec != )
{
if (ec == boost::asio::error::eof)
{
LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
}
else
{
LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
} return ;
} LOG4CPLUS_INFO(*gLog, "write proto finished.");
// 数据写完了之后,可以读对端发送过来的数据。
// 如果不再读对端的数据,直接该socket 将会被断开。
//async_read_head(conn);
}