使用Boost.Asio写的多线程TCP转发代理服务器

时间:2021-01-30 18:26:23

应用场景是这样的:

客户端和服务器在不同的网段内,它们之间不能直接通过TCP连接,但是有一台机器(暂时称为转发器)有双网卡,两块网卡分别在客户端和服务器端的网段内,这样转发器就能分别和客户端即服务器建立连接,并来回传输数据。

设计思路是这样的:

当客户端连接到转发器后,转发器马上建立一条到服务器之间的连接,与服务器端的连接建立后,就同时异步地从客户端和服务器端接收数据到两个缓冲区中,一旦任何一方有数据接收,就通过另外一条连接将数据发送到另一方,发送完毕后又开始新一轮的数据接收。如果在接收数据的过程中有任何一方出现错误,就将取消另外一条连接的异步调用,这样整个连接就会关闭。

其中用到了Boost库的以下特性:

1.Asio的异步IO调用

2.多线程

3.share_ptr的自动指针管理

遗留的一些问题:

由于采用了多线程,为了保险起见,我将每个回调函数都用strand包裹起来,在各回调函数没有使用共享资源的情况下并不必要,因此在这方面可以优化下

/*
 * =====================================================================================
 *
 * Filename: xproxy_main.cpp
 *
 * Description:
 *
 * Version: 1.0
 * Created: 2009年11月26日 15时10分29秒
 * Revision: none
 * Compiler: gcc
 *
 * Author: David Fang (A free programmer), qi_fd@163.com
 * Company: nocompany
 *
 * =====================================================================================
 */


#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include "xproxy_server.hpp"

int main(int argc, char* argv[])
{
    try
    {
        if(5 != argc)
        {
            std::cerr<<"Usage: xproxy <local port> <server ip> <server_port> <[thread size>/n";
            std::cerr<<"local port: local port used to accept login client/n";
            std::cerr<<"server ip: analysing server address, ip string in decimal dot format/n";
            std::cerr<<"server port: analysing server port, an unsigned short value/n";
            std::cerr<<"thread size: number of threads to running xproxy server/n";
            return 1;
        }
        xproxy_server srv(atoi(argv[1]), argv[2], atoi(argv[3]), atoi(argv[4]));

        srv.run();
    }
    catch(std::exception& e)
    {
        std::cerr<<"exception: "<<e.what()<<"/n";
    }

    return 0;
}

 

/*
 * =====================================================================================
 *
 * Filename: xproxy_server.hpp
 *
 * Description:
 *
 * Version: 1.0
 * Created: 2009年11月26日 15时12分01秒
 * Revision: none
 * Compiler: gcc
 *
 * Author: David Fang (A free programmer), qi_fd@163.com
 * Company: nocompany
 *
 * =====================================================================================
 */

#ifndef XPROXY_SERVER_HPP
#define XPROXY_SERVER_HPP

#include <string>

#include <boost/asio.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include "xproxy_connection.hpp"

using boost::asio::ip::tcp;

class xproxy_server:private boost::noncopyable
{
public:

    //construction of xproxy_server, which takes the destination machine(analysing server)'s

    //address and port(ipv4) as arguments

    explicit xproxy_server(unsigned short local_port, const std::string& ana_address, unsigned short ana_port,
            std::size_t thread_pool_size = 1);
    
    //Run the server's io_service loop

    void run();

    //Stop the server

    void stop();
private:
    //Handle the completion of an asynchronous accept from the login client

    void handle_accept(const boost::system::error_code& e);

    //The number of threads that will call io_service::run()

    std::size_t thread_pool_size_;

    //The io_service used to perform asynchronous operations.

    boost::asio::io_service io_service_;

    //Acceptor used to listen for incoming proxy connectins

    boost::asio::ip::tcp::acceptor acceptor_;

    //Local endpoint corresponding to the login client

    tcp::endpoint local_endpoint_;

    //The endpoint to analysing server

    tcp::endpoint analysing_server_endpoint_;

    //The next connectin to be accepted.

    xproxy_connection_ptr new_connection_;
};

#endif

 

/*
 * =====================================================================================
 *
 * Filename: xproxy_server.cpp
 *
 * Description:
 *
 * Version: 1.0
 * Created: 2009年11月26日 15时12分07秒
 * Revision: none
 * Compiler: gcc
 *
 * Author: David Fang (A free programmer), qi_fd@163.com
 * Company: nocompany
 *
 * =====================================================================================
 */


#include "xproxy_server.hpp"
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <vector>

xproxy_server::xproxy_server(unsigned short local_port,
        const std::string& ana_address,
        unsigned short ana_port,
        std::size_t thread_pool_size)
    :thread_pool_size_(thread_pool_size),
    acceptor_(io_service_),
    local_endpoint_(tcp::v4(), local_port),
    analysing_server_endpoint_(boost::asio::ip::address::from_string(ana_address), ana_port),
    new_connection_(new xproxy_connection(io_service_, analysing_server_endpoint_))
{
    acceptor_.open(local_endpoint_.protocol());
    acceptor_.set_option(tcp::acceptor::reuse_address(true));
    acceptor_.bind(local_endpoint_);
    acceptor_.listen();
    acceptor_.async_accept(new_connection_->login_clt_sock(),
            boost::bind(&xproxy_server::handle_accept, this,
                boost::asio::placeholders::error));
}

void xproxy_server::run()
{
    std::vector<boost::shared_ptr<boost::thread> > threads;
    for(std::size_t i = 0; i < thread_pool_size_; ++i)
    {
        boost::shared_ptr<boost::thread> thread(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_service_)));
        threads.push_back(thread);
    }

    for(std::size_t i = 0; i < threads.size(); ++i)
    {
        threads[i]->join();
    }
}

void xproxy_server::stop()
{
    io_service_.stop();
}

void xproxy_server::handle_accept(const boost::system::error_code& e)
{
    if(!e)
    {
        new_connection_->start();
        new_connection_.reset(new xproxy_connection(io_service_,
            analysing_server_endpoint_));

        acceptor_.async_accept(new_connection_->login_clt_sock(),
            boost::bind(&xproxy_server::handle_accept, this,
                boost::asio::placeholders::error));
    }
}


/*
 * =====================================================================================
 *
 * Filename: xproxy_connection.hpp
 *
 * Description:
 *
 * Version: 1.0
 * Created: 2009年11月26日 15时11分04秒
 * Revision: none
 * Compiler: gcc
 *
 * Author: David Fang (A free programmer), qi_fd@163.com
 * Company: nocompany
 *
 * =====================================================================================
 */

#ifndef XPROXY_CONNECTION_HPP
#define XPROXY_CONNECTION_HPP

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>

using boost::asio::ip::tcp;

class xproxy_connection:public boost::enable_shared_from_this<xproxy_connection>,
    private boost::noncopyable
{
public:
    //Contruct a connection with the given io_service, the analysing server address and port

    explicit xproxy_connection(boost::asio::io_service& io_service,
            tcp::endpoint& ana_endpoint);

    ~xproxy_connection();

    //Start the asyncronous connection to analysing server

    void start();

    tcp::socket& login_clt_sock();

private:
    //Handle completion of connection to analysing server

    void handle_connect_to_ana_server(const boost::system::error_code& e);

    //Handle completion of login client socket read

    void handle_login_clt_sock_read(const boost::system::error_code& e,
            std::size_t bytes_transferred);

    //Handle completion of analysing server socket write

    void handle_ana_srv_sock_write(const boost::system::error_code& e);

    //Handle completion of analysing server socket read

    void handle_ana_srv_sock_read(const boost::system::error_code& e,
            std::size_t bytes_transferred);

    //Handle completion of login client socket write

    void handle_login_clt_sock_write(const boost::system::error_code& e);

    //Strand to ensure the connection's handles are not called concurrently

    boost::asio::io_service::strand strand_;

    //analysing server endpoint

    tcp::endpoint ana_endpoint_;

    //socket to the flex login client

    tcp::socket login_clt_sock_;

    //socket to analysing server

    tcp::socket ana_srv_sock_;

    //buffer used to recieve data from the login client

    boost::array<char, 1024> clt_buffer_;
    
    //buffer used to recieve data from the analysing server

    boost::array<char, 1024> srv_buffer_;
};

typedef boost::shared_ptr<xproxy_connection> xproxy_connection_ptr;

#endif

 

/*
 * =====================================================================================
 *
 * Filename: xproxy_connection.cpp
 *
 * Description:
 *
 * Version: 1.0
 * Created: 2009年11月26日 15时12分33秒
 * Revision: none
 * Compiler: gcc
 *
 * Author: David Fang (A free programmer), qi_fd@163.com
 * Company: nocompany
 *
 * =====================================================================================
 */


#include "xproxy_connection.hpp"
#include <vector>
#include <iostream>
#include <boost/bind.hpp>

xproxy_connection::xproxy_connection(boost::asio::io_service& io_service,
    tcp::endpoint& ana_endpoint)
    :strand_(io_service),
    ana_endpoint_(ana_endpoint),
    login_clt_sock_(io_service),
    ana_srv_sock_(io_service)
{
    std::cout<<"new connection construct/n";
}

xproxy_connection::~xproxy_connection()
{
    std::cout<<"connection destruct/n";
}

tcp::socket& xproxy_connection::login_clt_sock()
{
    return login_clt_sock_;
}
        
void xproxy_connection::start()
{
    std::cout<<"connection start to connect to analysing server.../n";
    ana_srv_sock_.async_connect(ana_endpoint_,
        strand_.wrap(boost::bind(&xproxy_connection::handle_connect_to_ana_server,
            shared_from_this(), boost::asio::placeholders::error)));
}

void xproxy_connection::handle_connect_to_ana_server(const boost::system::error_code& e)
{
    if(!e)
    {
        std::cout<<"connect to analysing server succeed,"
            <<"now start to receive data from both sides.../n";

        login_clt_sock_.async_read_some(boost::asio::buffer(clt_buffer_),
                strand_.wrap(
                    boost::bind(&xproxy_connection::handle_login_clt_sock_read,
                        shared_from_this(), boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));

        ana_srv_sock_.async_read_some(boost::asio::buffer(srv_buffer_),
                strand_.wrap(
                    boost::bind(&xproxy_connection::handle_ana_srv_sock_read,
                        shared_from_this(), boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));

    }
}

void xproxy_connection::handle_login_clt_sock_read
    (const boost::system::error_code& e, std::size_t bytes_transferred)
{
    if(!e)
    {
        std::cout<<"data read from login client:/n";
        std::cout.write(clt_buffer_.data(), bytes_transferred);
        std::cout<<"/nnow send it to analysing server.../n";
        boost::asio::async_write(ana_srv_sock_,
                boost::asio::buffer(clt_buffer_.data(), bytes_transferred),
                strand_.wrap(boost::bind(
                        &xproxy_connection::handle_ana_srv_sock_write,
                        shared_from_this(), boost::asio::placeholders::error)));
    }
    else
    {
        std::cout<<"read data from login client error, "
            <<"now need to shutdown this connection/n";
        ana_srv_sock_.cancel();
    }
}

void xproxy_connection::handle_ana_srv_sock_write(const boost::system::error_code& e)
{
    if(!e)
    {
        std::cout<<"data send to analysing server complete, "
            <<"now start to receive data from login client again.../n";
        login_clt_sock_.async_read_some(boost::asio::buffer(clt_buffer_),
                strand_.wrap(
                    boost::bind(&xproxy_connection::handle_login_clt_sock_read,
                        shared_from_this(), boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));
    }
}

void xproxy_connection::handle_ana_srv_sock_read(
        const boost::system::error_code& e,
        std::size_t bytes_transferred)
{
    if(!e)
    {
        std::cout<<"data read from analysing server:/n";
        std::cout.write(srv_buffer_.data(), bytes_transferred);
        std::cout<<"/nnow send it to login client.../n";
        boost::asio::async_write(login_clt_sock_,
                boost::asio::buffer(srv_buffer_.data(), bytes_transferred),
                strand_.wrap(
                    boost::bind(&xproxy_connection::handle_login_clt_sock_write,
                        shared_from_this(), boost::asio::placeholders::error)));
    }
    else
    {
        std::cout<<"read data from analysing server error, "
            <<"now need to shutdown this connection/n";
        login_clt_sock_.cancel();
    }
}

void xproxy_connection::handle_login_clt_sock_write(const boost::system::error_code& e)
{
    if(!e)
    {
        std::cout<<"data send to login client complete, "
            <<"now start to receive data from analysing server again.../n";
        ana_srv_sock_.async_read_some(boost::asio::buffer(srv_buffer_),
                strand_.wrap(
                    boost::bind(&xproxy_connection::handle_ana_srv_sock_read,
                        shared_from_this(), boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));
    }
}

 

BOOST_INC=/home/done/dev_lib/boost_1_38_0/
BOOST_LIB=/home/done/dev_lib/boostlibs/
LIB_THREAD=$(BOOST_LIB)libboost_thread-gcc42-mt-s-1_38.a
LIB_SYSTEM=$(BOOST_LIB)libboost_system-gcc42-mt-s-1_38.a
SYS_INC=/usr/include/
CPP_INC=/usr/include/c++/4.2/
default:xproxy_main.o xproxy_server.o xproxy_connection.o
    g++ -o xproxy xproxy_main.o xproxy_server.o xproxy_connection.o $(LIB_THREAD) $(LIB_SYSTEM) -lpthread
xproxy_main.o:xproxy_main.cpp xproxy_server.hpp
    g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_main.cpp
xproxy_server.o:xproxy_server.cpp xproxy_connection.hpp
    g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_server.cpp
xproxy_connection.o:xproxy_connection.cpp xproxy_connection.hpp
    g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_connection.cpp
clean:
    rm *.o xproxy 2>/dev/null