boost.asio异步模式没有并发执行

时间:2022-07-02 05:54:55
//同时启动三个客户端进程时,发现服务端并没有并发执行,大家帮忙看看是什么原因

//服务端代码 server_asynchronize.h
#ifndef SERVER_ASYNCHRONIZE_H
#define SERVER_ASYNCHRONIZE_H


#include <ctime>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include<stdlib.h>

using boost::asio::ip::tcp;

std::string make_daytime_string();

class tcp_connection
    : public boost::enable_shared_from_this<tcp_connection>
{
public:
    typedef boost::shared_ptr<tcp_connection> pointer;


    static pointer create(boost::asio::io_service& io_service)
    {
        return pointer(new tcp_connection(io_service));
    }

    tcp::socket& socket();

    void start();

private:
    tcp_connection(boost::asio::io_service& io_service);

    void handle_write(const boost::system::error_code& /*error*/,   size_t /*bytes_transferred*/);

    tcp::socket socket_;
    std::string _message;
};


class tcp_server
{
public:
    tcp_server(boost::asio::io_service& io_service);

private:
    void start_accept();
    void handle_accept(tcp_connection::pointer new_connection_ptr,   const boost::system::error_code& error);

    tcp::acceptor _acceptor;
};

#endif // SERVER_ASYNCHRONIZE_H

//服务端代码 server_asynchronize.cpp
#include "server_asynchronize.h"

using boost::asio::ip::tcp;
using namespace std;


int main()
{
  try
  {
    boost::asio::io_service io_service;
    tcp_server server(io_service);
    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}

std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}


tcp_connection::tcp_connection(boost::asio::io_service& io_service)
    : socket_(io_service)
{
}


tcp::socket& tcp_connection::socket()
{
    return socket_;
}

void tcp_connection::start()
{
    //这里假设正在执行某项耗时的任务
    //-----------------------
    int randnumber=rand();
    int endnumber=randnumber/3;
    for(int j=0; j<endnumber; ++j)
    {
        ++randnumber;
        --randnumber;
    }
    //-----------------------

    _message = make_daytime_string();

    boost::asio::async_write(
        socket_,
        boost::asio::buffer(_message),
        boost::bind(
            &tcp_connection::handle_write, shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred)
    );
}


void tcp_connection::handle_write(const boost::system::error_code& /*error*/,  size_t /*bytes_transferred*/)
{
}


//---------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------

tcp_server::tcp_server(boost::asio::io_service& io_service)
    : _acceptor(io_service, tcp::endpoint(tcp::v4(), 10000))
{
    start_accept();
}

void tcp_server::start_accept()
{
    //通过随机数取的任务ID,假设每次取的taskid都是唯一的
    int taskid=rand();
    //这里没有并发执行,依然是按照顺序执行的,可能是什么原因?
    cout<<taskid<<", start  "<<endl;

    tcp_connection::pointer new_connection_ptr =tcp_connection::create(_acceptor.get_io_service());

    _acceptor.async_accept(
        new_connection_ptr->socket(),
        boost::bind(&tcp_server::handle_accept, this, new_connection_ptr,boost::asio::placeholders::error)
    );

     cout<<taskid<<", complete "<<endl;
}

void tcp_server::handle_accept(tcp_connection::pointer new_connection_ptr, const boost::system::error_code& error)
{
    if (!error)
    {
        new_connection_ptr->start();
    }

    start_accept();
}

//客户端代码 
#include <iostream>

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/asio/ip/tcp.hpp>

using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::system;
using boost::asio::ip::tcp;

void post_request_ip_query()
{
    try
    {
        io_service ios;
        string serverip="127.0.0.1";
        string portno="10000";

        tcp::resolver rsvr(ios);
        tcp::resolver::query qry(serverip,portno);
        tcp::resolver::iterator endpoint_iterator=rsvr.resolve(qry);

        tcp::socket skt(ios);
        boost::asio::connect(skt,endpoint_iterator);
        while(1)
        {
            boost::array<char,128>buf;
            boost::system::error_code error;
            size_t len=skt.read_some(buffer(buf) ,error);

            if(error==error::eof)
            {
                cout<<"finished "<<endl;
                break; //closed by peer;
            }else if(error)
            {
                throw system_error(error);
            }
            cout.write(buf.data(),len);

        }//while(1)

    }catch(std::exception& e)
    {
        cerr<<e.what()<<endl;
    }
}

int main()
{
    for (int i=0;i<1000;++i)
    {
        post_request_ip_query();
    }

    return 0;
}


5 个解决方案

#1


asio本身不涉及多线程,再说start_accept也没必要并行

 tcp_server server(io_service);
    io_service.run();
只会在当前线程中执行,根本没有多个线程run

#2


你把并发和异步的概念混淆了。

#3


是不是说在多核CPU的环境下,仅使用异步方式是不够的,还需要使用多线程才能同时相应多个请求?
为什么网上有些资料说异步方式比多线程更高效?

#4


可以参照Boost.Asio中的示例程序HTTP Server 3,使用单个io_service和多线程实现响应多个请求.

#5


参考这个:http://www.rosoo.net/a/201002/8563.html
关键点:
void xproxy_server::run()
{
    std::vector<boost::shared_ptr<boost::thread> > threads; 
    for(std::size_t i = 0; i < thread_pool_size_; ++i)
    {
        boost::shared_ptr<boost::thread> thread(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_service_)));

        threads.push_back(thread);
    }

    for(std::size_t i = 0; i < threads.size(); ++i)
    {
        threads[i]->join();
    }
}

#1


asio本身不涉及多线程,再说start_accept也没必要并行

 tcp_server server(io_service);
    io_service.run();
只会在当前线程中执行,根本没有多个线程run

#2


你把并发和异步的概念混淆了。

#3


是不是说在多核CPU的环境下,仅使用异步方式是不够的,还需要使用多线程才能同时相应多个请求?
为什么网上有些资料说异步方式比多线程更高效?

#4


可以参照Boost.Asio中的示例程序HTTP Server 3,使用单个io_service和多线程实现响应多个请求.

#5


参考这个:http://www.rosoo.net/a/201002/8563.html
关键点:
void xproxy_server::run()
{
    std::vector<boost::shared_ptr<boost::thread> > threads; 
    for(std::size_t i = 0; i < thread_pool_size_; ++i)
    {
        boost::shared_ptr<boost::thread> thread(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_service_)));

        threads.push_back(thread);
    }

    for(std::size_t i = 0; i < threads.size(); ++i)
    {
        threads[i]->join();
    }
}