boost asio tcp 多线程异步读写,服务器与客户端。

时间:2022-09-08 22:34:45
boost asio tcp 多线程异步读写,服务器与客户端。boost asio tcp 多线程异步读写,服务器与客户端。
  1 // server.cpp
2
3 #if 0
4 多个线程对同一个io_service 对象处理
5 用到第三方库:log4cplus, google::protobuf
6 用到C++11的特性,Windows 需要用到vs2013 gcc 4.8
7 #endif
8
9 #include <iostream>
10 #include <thread>
11 #include <vector>
12
13 #include <boost/asio.hpp>
14
15 #include <boost/shared_array.hpp>
16 #include <boost/make_shared.hpp>
17 #include <boost/function.hpp>
18 #include <boost/bind.hpp>
19
20 #include <common.pb.h>
21
22 void async_accept();
23 void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
24 const boost::system::error_code &ec);
25 void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
26 void handle_head(
27 boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
28 boost::shared_array<char> sa_len,
29 const boost::system::error_code &ec,
30 std::size_t bytes_transfered);
31 void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len);
32 void handle_proto(
33 boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
34 boost::shared_array<char> sa_data,
35 const boost::system::error_code &ec,
36 std::size_t bytes_transfered);
37
38 boost::asio::io_service io_svc;
39 boost::asio::ip::address_v4 lis_ip; // 默认监听本机所有IP
40 boost::asio::ip::tcp::endpoint lis_ep(lis_ip, 20017);
41 boost::asio::ip::tcp::acceptor acceptor(io_svc, lis_ep);
42
43 #include <Log.h> // log4cplus 的相关头文件,这里不再一个一个敲出来了。
44
45 log4cplus::Logger *gLog = nullptr;
46
47 static const int PACKAGE_LENGTH = 16;
48
49 int main(int argc, char *argv[])
50 {
51 log4cplus::initialize();
52
53 static log4cplus::Logger s_log = log4cplus::Logger::getInstance("server");
54 gLog = &s_log;
55
56 LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
57
58 for (int i = 0; i < 5; ++i)
59 {
60 async_accept();
61 }
62
63 // 捕获信号
64 boost::asio::signal_set signals_(io_svc);
65 signals_.add(SIGINT);
66 signals_.add(SIGTERM);
67 signals_.async_wait([](const boost::system::error_code &ec, int sig)
68 {
69 LOG4CPLUS_INFO_FMT(*gLog, "signal: %d, error_message: %s",
70 sig, ec.message().c_str());
71 io_svc.stop();
72 });
73
74 std::vector<std::thread> vecThread;
75 for (int i = 0; i < 10; ++i)
76 {
77 vecThread.emplace_back(std::thread([](){
78 LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
79 io_svc.run();
80 LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
81 }));
82 }
83
84 for (size_t i = 0; i < vecThread.size(); ++i)
85 {
86 vecThread[i].join();
87 }
88 assert(io_svc.stopped();
89
90 #ifdef WIN32
91 system("pause");
92 #endif
93
94 return 0;
95 }
96
97 // 标记异步监听,投放到指定io_service 对象中
98 void async_accept()
99 {
100 LOG4CPLUS_INFO_FMT(*gLog, "async_accept waitting...");
101
102 boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
103 = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(ios_svc));
104
105 boost::function<void(const boost::system::error_code &> cb_accept;
106 cb_accept = boost::bind(handle_accept, new_sock, _1);
107 acceptor.async_accept(*new_sock, cb_accept);
108 }
109
110 // 监听返回的处理
111 void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
112 const boost::system::error_code &ec)
113 {
114 if (ec != 0)
115 {
116 LOG4CPLUS_INFO(*gLog, "accept failed: " << ec.message());
117
118 return;
119 }
120 LOG4CPLUS_INFO(*gLog, "a new client connected. " << new_conn->remote_endpoint());
121
122 async_read_head(new_conn);
123
124 // 处理下一个连接,每次处理完了之后,需要再次accept.
125 // 否则io_service 将只处理一次,然后结束监听。
126 // 所以这里可以处理一个情况,就是当你要结束监听的时候,人要在这里return
127 // 那么io_service 的run() 函数就会stop. 但如果还有其他的异步操作被记录,
128 // run() 函数还是会继续运行,以处理其他的异步操作。
129 async_accept();
130 }
131
132 // 对一个指定的连接标记异步读头部,然后投放到io_service 对象
133 void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
134 {
135 // 固定报文头长度为${PACKAGE_LENGTH} 个字节
136 boost::shared_array<char> sa_len(new char[PACKAGE_LENGTH]);
137
138 // 回调函数
139 boost::function<void(const boost::system::error_code &, std::size_t)> cb_msg_len;
140 cb_msg_len = boost::bind(handle_head, conn, sa_len, _1, _2);
141
142 // 异步读,读一个报文的长度,boost::asio::async_read() 函数有个特点,
143 // 它会将这里指定的buffer 缓冲区读满了才会回调handle_head 函数。
144 boost::asio::async_read(
145 *conn, boost::asio::buffer(sa_len.get(), PACKAGE_LENGTH), cb_msg_len);
146 }
147
148 // 头部数据完整读取后的处理函数
149 void handle_head(
150 boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
151 boost::shared_array<char> sa_len,
152 const boost::system::error_code &ec,
153 std::size_t bytes_transfered)
154 {
155 if (!conn->is_open())
156 {
157 LOG4CPLUS_INFO(*gLog, "socket was not opened.");
158 return ;
159 }
160
161 if (ec != 0)
162 {
163 if (ec == boost::asio::error::eof)
164 {
165 LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
166 }
167 else
168 {
169 LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
170 }
171
172 return ;
173 }
174
175 // 这里对的数据做处理
176 assert(bytes_transfered == PACKAGE_LENGTH);
177 int32_t len_net = 0; // 网络字节序:数据部分长度
178 int32_t len_loc = 0; // 本地字节序:数据部分长度
179 memcpy(&len_net, sa_len.get(), sizeof(len_net));
180 len_loc = boost::asio::detail::socket_ops::network_to_host_long(len_net);
181 LOG4CPLUS_INFO_FMT(*gLog, "nLenLoc: %d", len_loc);
182
183 async_read_proto(conn, len_loc);
184 }
185
186 // 对一个指定的连接标记异步读数据部,然后投放到io_service 对象
187 void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len)
188 {
189 // 数据部分
190 boost::shared_array<char> sa_data(new char[len]());
191
192 // 回调函数
193 boost::function<void(const boost::system::error_code &, std::size_t)> cb_proto;
194 cb_proto = boost::bind(handle_proto, conn, sa_data, _1, _2);
195
196 boost::asio::async_read(*conn,
197 boost::asio::buffer(sa_data.get(), len), cb_proto);
198 }
199
200 // 数据部分读完整后的处理函数
201 void handle_proto(
202 boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
203 boost::shared_array<char> sa_data,
204 const boost::system::error_code &ec,
205 std::size_t bytes_transfered)
206 {
207 if (!conn->is_open())
208 {
209 LOG4CPLUS_INFO(*gLog, "socket was not opened.");
210 return ;
211 }
212
213 if (ec != 0)
214 {
215 if (ec == boost::asio::error::eof)
216 {
217 LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
218 }
219 else
220 {
221 LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
222 }
223 return ;
224 }
225
226 // 处理这个proto 数据
227 // 这里将这个数组转换成一个proto, 然后处理这个proto
228 MessageHead pro;
229 if (!pro.ParseFromArray(sa_data.get(), (int32_t)bytes_transfered))
230 {
231 LOG4CPLUS_ERROR_FMT(*gLog, "ParseFromArray() failed");
232 return ;
233 }
234
235 int port = conn->remote_endpoint().port();
236 LOG4CPLUS_INFO_FMT(*gLog, "port: %d\n%s", port, pro.DebugString().c_str()0;
237
238 // 处理完了之后,类似accept 的异步调用一样,需要继续调用异步的读数据
239 // 同样的,如果要结束一个连接,正常的结算应该在这里return 调用。
240 // 当然了,使用socket 的close(), shut_down() 函数也可以关闭这个连接。
241 async_read_head(conn);
242 }
View Code

 

boost asio tcp 多线程异步读写,服务器与客户端。boost asio tcp 多线程异步读写,服务器与客户端。
  1 // client.cpp
2
3 #include <iostream>
4 #include <thread>
5 #include <vector>
6
7 #include <boost/asio.hpp>
8
9 #include <boost/shared_array.hpp>
10 #include <boost/make_shared.hpp>
11 #include <boost/function.hpp>
12 #include <boost/bind.hpp>
13
14 #include <boost/pool/pool.hpp>
15 #include <boost/pool/singleton_pool.hpp>
16
17 // proto buffer 生成的头文件
18 #include <common.pb.h>
19
20 void async_connect();
21 void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
22 const boost::system::error_code &ec);
23 void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
24 void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
25 const std::shared_ptr<std::string> sp_data_proto,
26 const boost::system::error_code &ec,
27 std::size_t bytes_transfered);
28 void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
29 const std::shared_ptr<std::string> sp_data_proto,
30 const boost::system::error_code &ec, std::size_t bytes_transfered);
31
32
33
34 boost::asio::io_service io_svc;
35 boost::asio::ip::tcp::endpoint svr_ep(
36 boost::asio::ip::address_v4::from_string("127.0.0.1"), 20017);
37
38 #include <Log.h> // log4cplus 的相关头文件,这里不再一个一个敲出来了。
39
40 log4cplus::Logger *gLog = nullptr; // 应该是直接使用对象,但是懒得改就保留了。
41
42 // 包头固定长度
43 static const int PACKAGE_LENGTH = 16;
44
45 using pool_head = boost::singleton_pool<struct struHead, PACKAGE_LENGTH>;
46 using pool_string = boost::singleton_pool<struct struString, sizeof(std::string)>;
47
48 std::shared_ptr<std::string> createSharedString()
49 {
50 std::shared_ptr<std::string> spTp(new (pool_string::malloc()) std::string,
51 [](std::string *tp)
52 {
53 tp->~basic_string();
54 pool_string::free(tp);
55 });
56
57 return spTp;
58 }
59
60 int main(int argc, char *argv[])
61 {
62 log4cplus::initialize();
63
64 static log4cplus::Logger s_log = log4cplus::Logger::getInstance("client");
65 gLog = &s_log;
66 assert(gLog != nullptr);
67
68 LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
69
70 for (int i = 0; i < 50; ++i)
71 {
72 async_connect();
73 }
74
75 std::vector<std::thread> vecThread;
76 for (int i = 0; i < 5; ++i)
77 {
78 vecThread.emplace_back(std::thread([]() {
79 LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
80 io_svc.run();
81 LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
82 }));
83 }
84
85 for (size_t i = 0; i < vecThread.size(); ++i)
86 {
87 vecThread[i].join();
88 }
89 assert(io_svc.stopped());
90
91 #ifdef WIN32
92 system("pause");
93 #endif
94
95
96 return 0;
97 }
98
99 void async_connect()
100 {
101 LOG4CPLUS_INFO_FMT(*gLog, "async_connect waitting...");
102
103 boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
104 = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(io_svc));
105
106 new_sock->async_connect(svr_ep, boost::bind(
107 handle_connect, new_sock,
108 boost::asio::placeholders::error));
109 }
110
111 void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
112 const boost::system::error_code &ec)
113 {
114 if (ec != 0)
115 {
116 LOG4CPLUS_INFO(*gLog, "connect failed: " << ec.message());
117 return ;
118 }
119
120 LOG4CPLUS_INFO(*gLog, "connect success, server: " << new_conn->remote_endpoint());
121
122 async_write(new_conn);
123 }
124
125 #if 0
126 message messageHead
127 {
128 optional uint32 FunCode = 1;
129 optional uint32 RequestID = 2;
130 optional uint32 AccountId = 3;
131 optional uint32 AccessId = 4;
132 optional int64 ClientTime = 5;
133 optional uint32 GoodsId = 6;
134 optional bytes UUID = 7;
135 }
136 #endif
137
138 void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
139 {
140 MessageHead pro;
141 pro.set_funcode(9527);
142 pro.set_requestid(10081);
143 pro.set_accountid(49005);
144 pro.set_clienttime(time(NULL));
145 pro.set_goodsid(35023);
146 pro.set_uuid(std::string("uuid_500384"));
147
148 std::shared_ptr<std::string> sp_data = createSharedString();
149 if (!pro.SerializeToString(sp_data.get())
150 {
151 LOG4CPLUS_ERROR_FMT(*gLOg, "SerializeToString failed.");
152
153 return ;
154 }
155
156 LOG4CPLUS_INFO_FMT(*gLog, "data.size() = %lld", sp_data->size());
157
158 char ch_head[PACKAGE_LENGTH] = {};
159 int32_t len_net = boost::asio::detail::socket_ops::host_to_network_long((int32_t)sp_data->size());
160 memcpy(ch_head, &len_net, sizeof(len_net));
161
162 if (sp_data->size() == 0)
163 {
164 return ;
165 }
166
167 boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_head;
168 cb_write_head = boost::bind(handle_write_head, conn, sp_data, _1, _2);
169 boost::asio::async_write(*conn, boost::asio::buffer(ch_head, PACKAGE_LENGTH), cb_write_head);
170 }
171
172 void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
173 const std::shared_ptr<std::string> sp_data_proto,
174 const boost::system::error_code &ec,
175 std::size_t bytes_transfered)
176 {
177 if (!conn->is_open())
178 {
179 LOG4CPLUS_INFO(*gLog, "socket was not opened.");
180 return;
181 }
182
183 if (ec != 0)
184 {
185 if (ec == boost::asio::error::eof)
186 {
187 LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
188 }
189 else
190 {
191 LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
192 }
193
194 return ;
195 }
196
197 boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_proto;
198 cb_write_proto = boost::bind(handle_write_proto, conn, sp_data_proto, _1, _2);
199 boost::asio::async_write(*conn, boost::asio::buffer(*sp_data_proto), cb_write_proto);
200 }
201
202 // 这里的sp_data_proto 在该函数中并不需要使用,用它作参数的唯一作用,就是保留它的生命周期,
203 // 保证在数据写完之前它不会被析构。
204 // 因为,如果该对象在async_write 还未写完之前就被析构的话, 就会造成数据的错乱,最终导致对端的数据是错误的。
205 void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
206 const std::shared_ptr<std::string> sp_data_proto,
207 const boost::system::error_code &ec, std::size_t bytes_transfered)
208 {
209 if (!conn->is_open())
210 {
211 LOG4CPLUS_INFO(*gLog, "socket was not opened.");
212 return ;
213 }
214
215 if (ec != 0)
216 {
217 if (ec == boost::asio::error::eof)
218 {
219 LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
220 }
221 else
222 {
223 LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
224 }
225
226 return ;
227 }
228
229 LOG4CPLUS_INFO(*gLog, "write proto finished.");
230 // 数据写完了之后,可以读对端发送过来的数据。
231 // 如果不再读对端的数据,直接该socket 将会被断开。
232 //async_read_head(conn);
233 }
View Code