当前我在写一个豆瓣相册的下载器,单机的已经完成了。为了加快下载速度同时规避豆瓣的爬虫封禁,准备把这个下载器扩充到多机版,即利用多个vps实现同时下载。本机为server端,那些vps为client端。server端启动之后建立一个异步定时器,client端启动之后链接server端,成功之后发送一个注册请求。如果定时器已经过期,则注册失败,server端发送一个注册失败给client端,然后该client自己断开连接。如果定时器未过期,则注册成功,server端发送注册成功信息,然后该client持续监听。当定时器过期时,server端会向用户请求豆瓣相册的地址,然后获得该相册的网页内容并分析其中的链接,并加入链接队列中。分析完成之后,将所有的链接均分到各个注册了的vps上,发送页面请求。所有链接发送完成之后,发送页面结束请求。当client接收到页面请求时,将页面uri放到待处理队列;当client接收到页面结束请求时,停止监听并开始处理uri队列。处理过程大概就是将图片下载下来放到本地文件夹,然后再把文件夹中所有文件发送到server端,发送完成之后再给server端发送一个文件发送完成请求,然后断开连接。
当前我正在完成其中的c/s通信部分,文件部分和网页部分已经完成但还没有加入到通信部分。
当前的主要问题是通信部分有错误,首先server端无法收到注册信息,client也无法收到注册确认信息,但是uri页面请求信息能够收到,而uri页面结束请求信息却又无法收到。同时更诡异的是对于同一个页面请求信息,client端会显示收到多次。
该项目代码根据asio 官方example的chat 示例修改而来的,代码不长,git地址在此,https://github.com/huangfeidian/C-S_Chat。该代码相对于官方示例最大的改动在于server端能够主动的向client端发信息,感觉这里可能会出问题。个人刚接触asio,实在是不知道哪里错了,恳求指点。
13 个解决方案
#1
这里贴一下client的代码
typedef std::queue<chat_message> chat_message_queue;
class tcp_message_client
{
public:
tcp::resolver::iterator endpoint_iterator;
std::vector<std::string> all_uri;
tcp_message_client(boost::asio::io_service& io_service,
const std::string& host, const std::string& msg_port)
: io_service_(io_service),
socket_(io_service)
{
tcp::resolver resolver(io_service);
endpoint_iterator = resolver.resolve({ host, msg_port });
do_connect(endpoint_iterator);
}
void write(const chat_message& msg)
{
io_service_.post(
[this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push(msg);
if (!write_in_progress)
{
do_write();
}
});
}
void close()
{
io_service_.post([this]()
{
socket_.close();
});
}
private:
void do_connect(tcp::resolver::iterator endpoint_iterator)
{
boost::asio::async_connect(socket_, endpoint_iterator,
[this](boost::system::error_code ec, tcp::resolver::iterator)
{
if (!ec)
{
do_read_header();
}
});
}
void do_read_header()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
do_read_body();
}
else
{
socket_.close();
}
});
}
void do_read_body()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
std::string cur_uri;
chat_instructions instru_type;
read_msg_.decode_instruction(instru_type);
switch (instru_type)
{
case chat_instructions::reject_register:
std::cout << "request rejected fuck " << std::endl;
socket_.close();
break;
case chat_instructions::admit_register:
std::cout << "request admitted yeah " << std::endl;
do_read_header();
break;
case chat_instructions::page_request:
read_msg_.decode_uri(cur_uri);
std::cout << "get uri from server " << cur_uri << std::endl;
all_uri.push_back(cur_uri);
do_read_header();
break;
case chat_instructions::page_request_end:
std::cout << "this is the end of page" << std::endl;
do_uri();
break;
default:
socket_.close();
}
}
else
{
socket_.close();
}
});
}
virtual void do_uri()
{
std::cout << "sending eof" << std::endl;
send_eof();
}
void send_eof()
{
chat_message end_of_file;
end_of_file.encode_instuction(chat_instructions::file_send_end);
write(end_of_file);
close();
}
void do_write()
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
write_msgs_.pop();
if (!write_msgs_.empty())
{
do_write();
}
}
else
{
socket_.close();
}
});
}
private:
boost::asio::io_service& io_service_;
tcp::socket socket_;
chat_message read_msg_;
chat_message_queue write_msgs_;
};
int main(int argc,char** argv)
{
try
{
if (argc != 3)
{
std::cerr << "Usage: chat_client <host> <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
//auto endpoint_iterator = resolver.resolve({ "127.0.0.1", "1345" });
tcp_message_client c(io_service, "127.0.0.1","1345");
std::thread t([&io_service]()
{
io_service.run();
});
char line[chat_message::max_body_length + 1];
chat_message reg_msg;
reg_msg.encode_instuction(chat_instructions::request_register);
c.write(reg_msg);
t.join();
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#2
这里贴一下server的代码
bool is_expired = false;
using chat_message_queue = std::queue < chat_message >;
std::atomic<int> connection_count = 0;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket, boost::asio::io_service& IN_io)
: socket_(std::move(socket)), io_service_(IN_io)
{
}
void start()
{
std::cout << "connection from " << socket_.remote_endpoint().address() << std::endl;
do_read_header();
}
void write(const chat_message& msg)
{
write_msgs_ = msg;
do_write(msg.length());
}
void send_uri(const std::string& uri)
{
temp_msg.encode_uri(uri);
write(temp_msg);
}
void send_end()
{
temp_msg.encode_instuction(chat_instructions::page_request_end);
write(temp_msg);
}
private:
void do_read_header()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::uint32_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
do_read_body();
}
else
{
socket_.close();
}
});
}
void do_read_body()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this](boost::system::error_code ec, std::uint32_t /*length*/)
{
if (!ec)
{
std::string hello, origin;
chat_instructions cur_inst;
std::cout << "message from " << socket_.remote_endpoint().address() << ": " << socket_.remote_endpoint().port()<<": " ;
//std::cout.write(read_msg_.body(), read_msg_.body_length());
read_msg_.decode_instruction(cur_inst);
//std::cout << "\n";
//do_read_header();
switch (cur_inst)
{
case chat_instructions::request_register:
std::cout << "request register" << std::endl;
if (is_expired)
{
temp_msg.encode_instuction(chat_instructions::reject_register);
write(temp_msg);
//write(temp_msg);
connection_count--;
socket_.close();
}
else
{
temp_msg.encode_instuction(chat_instructions::admit_register);
write(temp_msg);
//do_read_header();
}
break;
case chat_instructions::file_send_end:
connection_count--;
std::cout << "file_end" << std::endl;
if (connection_count == 0)
{
std::cout << " everything has finished" << std::endl;
socket_.close();
io_service_.stop();
}
else
{
socket_.close();
}
break;
default:
socket_.close();
break;
}
}
else
{
socket_.close();
}
});
}
void do_write(std::uint32_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(write_msgs_.data(), length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read_header();
}
});
}
tcp::socket socket_;
boost::asio::io_service& io_service_;
enum
{
max_length = 1024
};
char data_[max_length];
chat_message read_msg_;
chat_message temp_msg;
chat_message write_msgs_;
};
class tcp_message_server
{
public:
tcp_message_server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service), wait_windows(io_service, boost::posix_time::minutes(1)), io_service_(io_service)
{
wait_windows.async_wait(std::bind(&tcp_message_server::get_douban_album, this,std::placeholders::_1));
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec)
{
connection_count++;
all_session.push_back(std::make_shared<session>(std::move(socket_), io_service_));
all_session.back()->start();
}
do_accept();
});
}
virtual void analyze_uri(const std::string& IN_uri)
{
all_uri.push_back(IN_uri);
//dispatch_uri();
}
void dispatch_uri()
{
int session_size = all_session.size();
for (int i = 0; i < all_uri.size(); i++)
{
all_session[i%session_size]->send_uri(all_uri[i]);
}
for (int i = 0; i < session_size; i++)
{
all_session[i]->send_end();
}
}
void get_douban_album(const boost::system::error_code& error)
{
is_expired = true;
std::cout << "please enter the douban album uri " << std::endl;
std::string album_uri;
std::cin >> album_uri;
analyze_uri(album_uri);
dispatch_uri();
}
std::vector<std::shared_ptr<session>> all_session;
tcp::acceptor acceptor_;
tcp::socket socket_;
boost::asio::deadline_timer wait_windows;
std::vector<std::string> all_uri;
boost::asio::io_service& io_service_;
};
int main(int argc, char* argv [])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: douban chat_server <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp_message_server s(io_service, std::atoi(argv[1]));
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#3
你的clients和server用长连接就可以啦
server有任务后轮排给client,client做完后发回给server
server有任务后轮排给client,client做完后发回给server
#4
当前的server只涉及到命令传递,还没涉及到核心的网页抓取和文件传输,网页抓取和文件传输都会放到另外一个io_service里面。
现在的问题是:我当前的代码有什么问题,为什么有些消息收不到,有些消息会被重复接收。。。
现在的问题是:我当前的代码有什么问题,为什么有些消息收不到,有些消息会被重复接收。。。
#5
首先一点,你的message使用TLV格式,你的length字段要注意大小段转换啊
#6
我看你的client应该是参考了asio的example写的吧
自己加日志调吧,或者多去看看那些example,理解人家的思想
自己加日志调吧,或者多去看看那些example,理解人家的思想
#7
额,原来在网络上发消息这么坑啊。之前就被size_t坑了一次,本机windows编译为server,64位linux当client,两者size_t长度不一样。。。 谢谢你的建议,我去看看相关资料,晚上准备继续改改。
#8
对,就是改的官方的chat示例。把c++11的asio示例都看了,不过没有提到server端读和写都同时发生的情况。同时读写出现在chat 的client端,可以在server端我不知道怎么改。。。asio的文档好少。
晚上抓下包,看看到底什么问题。
晚上抓下包,看看到底什么问题。
#9
重新看了一下encode和decode相关的代码,发现我不需要处理大小端。因为length字段encode的时候是先转换为一个4位的字符串然后再发送的,decode的时候也是采取atoi的形式获得整数值。
真没想到我这照着抄也能规避大小端的问题。。。
真没想到我这照着抄也能规避大小端的问题。。。
#10
不知道有多少前人掉在TCP Socket
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面啊!
http://bbs.csdn.net/topics/380167545
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面啊!
http://bbs.csdn.net/topics/380167545
#11
赵老师啊,我当前的程序还没跳进这个坑里,你现在就把坑给我挖好了!!!
#12
#13
#1
这里贴一下client的代码
typedef std::queue<chat_message> chat_message_queue;
class tcp_message_client
{
public:
tcp::resolver::iterator endpoint_iterator;
std::vector<std::string> all_uri;
tcp_message_client(boost::asio::io_service& io_service,
const std::string& host, const std::string& msg_port)
: io_service_(io_service),
socket_(io_service)
{
tcp::resolver resolver(io_service);
endpoint_iterator = resolver.resolve({ host, msg_port });
do_connect(endpoint_iterator);
}
void write(const chat_message& msg)
{
io_service_.post(
[this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push(msg);
if (!write_in_progress)
{
do_write();
}
});
}
void close()
{
io_service_.post([this]()
{
socket_.close();
});
}
private:
void do_connect(tcp::resolver::iterator endpoint_iterator)
{
boost::asio::async_connect(socket_, endpoint_iterator,
[this](boost::system::error_code ec, tcp::resolver::iterator)
{
if (!ec)
{
do_read_header();
}
});
}
void do_read_header()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
do_read_body();
}
else
{
socket_.close();
}
});
}
void do_read_body()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
std::string cur_uri;
chat_instructions instru_type;
read_msg_.decode_instruction(instru_type);
switch (instru_type)
{
case chat_instructions::reject_register:
std::cout << "request rejected fuck " << std::endl;
socket_.close();
break;
case chat_instructions::admit_register:
std::cout << "request admitted yeah " << std::endl;
do_read_header();
break;
case chat_instructions::page_request:
read_msg_.decode_uri(cur_uri);
std::cout << "get uri from server " << cur_uri << std::endl;
all_uri.push_back(cur_uri);
do_read_header();
break;
case chat_instructions::page_request_end:
std::cout << "this is the end of page" << std::endl;
do_uri();
break;
default:
socket_.close();
}
}
else
{
socket_.close();
}
});
}
virtual void do_uri()
{
std::cout << "sending eof" << std::endl;
send_eof();
}
void send_eof()
{
chat_message end_of_file;
end_of_file.encode_instuction(chat_instructions::file_send_end);
write(end_of_file);
close();
}
void do_write()
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
write_msgs_.pop();
if (!write_msgs_.empty())
{
do_write();
}
}
else
{
socket_.close();
}
});
}
private:
boost::asio::io_service& io_service_;
tcp::socket socket_;
chat_message read_msg_;
chat_message_queue write_msgs_;
};
int main(int argc,char** argv)
{
try
{
if (argc != 3)
{
std::cerr << "Usage: chat_client <host> <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
//auto endpoint_iterator = resolver.resolve({ "127.0.0.1", "1345" });
tcp_message_client c(io_service, "127.0.0.1","1345");
std::thread t([&io_service]()
{
io_service.run();
});
char line[chat_message::max_body_length + 1];
chat_message reg_msg;
reg_msg.encode_instuction(chat_instructions::request_register);
c.write(reg_msg);
t.join();
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#2
这里贴一下server的代码
bool is_expired = false;
using chat_message_queue = std::queue < chat_message >;
std::atomic<int> connection_count = 0;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket, boost::asio::io_service& IN_io)
: socket_(std::move(socket)), io_service_(IN_io)
{
}
void start()
{
std::cout << "connection from " << socket_.remote_endpoint().address() << std::endl;
do_read_header();
}
void write(const chat_message& msg)
{
write_msgs_ = msg;
do_write(msg.length());
}
void send_uri(const std::string& uri)
{
temp_msg.encode_uri(uri);
write(temp_msg);
}
void send_end()
{
temp_msg.encode_instuction(chat_instructions::page_request_end);
write(temp_msg);
}
private:
void do_read_header()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::uint32_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
do_read_body();
}
else
{
socket_.close();
}
});
}
void do_read_body()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this](boost::system::error_code ec, std::uint32_t /*length*/)
{
if (!ec)
{
std::string hello, origin;
chat_instructions cur_inst;
std::cout << "message from " << socket_.remote_endpoint().address() << ": " << socket_.remote_endpoint().port()<<": " ;
//std::cout.write(read_msg_.body(), read_msg_.body_length());
read_msg_.decode_instruction(cur_inst);
//std::cout << "\n";
//do_read_header();
switch (cur_inst)
{
case chat_instructions::request_register:
std::cout << "request register" << std::endl;
if (is_expired)
{
temp_msg.encode_instuction(chat_instructions::reject_register);
write(temp_msg);
//write(temp_msg);
connection_count--;
socket_.close();
}
else
{
temp_msg.encode_instuction(chat_instructions::admit_register);
write(temp_msg);
//do_read_header();
}
break;
case chat_instructions::file_send_end:
connection_count--;
std::cout << "file_end" << std::endl;
if (connection_count == 0)
{
std::cout << " everything has finished" << std::endl;
socket_.close();
io_service_.stop();
}
else
{
socket_.close();
}
break;
default:
socket_.close();
break;
}
}
else
{
socket_.close();
}
});
}
void do_write(std::uint32_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(write_msgs_.data(), length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read_header();
}
});
}
tcp::socket socket_;
boost::asio::io_service& io_service_;
enum
{
max_length = 1024
};
char data_[max_length];
chat_message read_msg_;
chat_message temp_msg;
chat_message write_msgs_;
};
class tcp_message_server
{
public:
tcp_message_server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service), wait_windows(io_service, boost::posix_time::minutes(1)), io_service_(io_service)
{
wait_windows.async_wait(std::bind(&tcp_message_server::get_douban_album, this,std::placeholders::_1));
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec)
{
connection_count++;
all_session.push_back(std::make_shared<session>(std::move(socket_), io_service_));
all_session.back()->start();
}
do_accept();
});
}
virtual void analyze_uri(const std::string& IN_uri)
{
all_uri.push_back(IN_uri);
//dispatch_uri();
}
void dispatch_uri()
{
int session_size = all_session.size();
for (int i = 0; i < all_uri.size(); i++)
{
all_session[i%session_size]->send_uri(all_uri[i]);
}
for (int i = 0; i < session_size; i++)
{
all_session[i]->send_end();
}
}
void get_douban_album(const boost::system::error_code& error)
{
is_expired = true;
std::cout << "please enter the douban album uri " << std::endl;
std::string album_uri;
std::cin >> album_uri;
analyze_uri(album_uri);
dispatch_uri();
}
std::vector<std::shared_ptr<session>> all_session;
tcp::acceptor acceptor_;
tcp::socket socket_;
boost::asio::deadline_timer wait_windows;
std::vector<std::string> all_uri;
boost::asio::io_service& io_service_;
};
int main(int argc, char* argv [])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: douban chat_server <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp_message_server s(io_service, std::atoi(argv[1]));
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#3
你的clients和server用长连接就可以啦
server有任务后轮排给client,client做完后发回给server
server有任务后轮排给client,client做完后发回给server
#4
当前的server只涉及到命令传递,还没涉及到核心的网页抓取和文件传输,网页抓取和文件传输都会放到另外一个io_service里面。
现在的问题是:我当前的代码有什么问题,为什么有些消息收不到,有些消息会被重复接收。。。
现在的问题是:我当前的代码有什么问题,为什么有些消息收不到,有些消息会被重复接收。。。
你的clients和server用长连接就可以啦
server有任务后轮排给client,client做完后发回给server
#5
首先一点,你的message使用TLV格式,你的length字段要注意大小段转换啊
#6
我看你的client应该是参考了asio的example写的吧
自己加日志调吧,或者多去看看那些example,理解人家的思想
自己加日志调吧,或者多去看看那些example,理解人家的思想
#7
额,原来在网络上发消息这么坑啊。之前就被size_t坑了一次,本机windows编译为server,64位linux当client,两者size_t长度不一样。。。 谢谢你的建议,我去看看相关资料,晚上准备继续改改。
首先一点,你的message使用TLV格式,你的length字段要注意大小段转换啊
#8
对,就是改的官方的chat示例。把c++11的asio示例都看了,不过没有提到server端读和写都同时发生的情况。同时读写出现在chat 的client端,可以在server端我不知道怎么改。。。asio的文档好少。
晚上抓下包,看看到底什么问题。
晚上抓下包,看看到底什么问题。
我看你的client应该是参考了asio的example写的吧
自己加日志调吧,或者多去看看那些example,理解人家的思想
#9
重新看了一下encode和decode相关的代码,发现我不需要处理大小端。因为length字段encode的时候是先转换为一个4位的字符串然后再发送的,decode的时候也是采取atoi的形式获得整数值。
真没想到我这照着抄也能规避大小端的问题。。。
真没想到我这照着抄也能规避大小端的问题。。。
首先一点,你的message使用TLV格式,你的length字段要注意大小段转换啊
#10
不知道有多少前人掉在TCP Socket
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面啊!
http://bbs.csdn.net/topics/380167545
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面啊!
http://bbs.csdn.net/topics/380167545
#11
赵老师啊,我当前的程序还没跳进这个坑里,你现在就把坑给我挖好了!!!
不知道有多少前人掉在TCP Socket
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面啊!
http://bbs.csdn.net/topics/380167545
#12
赵老师啊,我当前的程序还没跳进这个坑里,你现在就把坑给我挖好了!!!
不知道有多少前人掉在TCP Socket
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面啊!
http://bbs.csdn.net/topics/380167545