使用boost::asio::io_service::post()

时间:2022-09-09 12:54:55

First i asked this Running a function on the main thread from a boost thread and passing parameters to that function

首先,我要求这个函数在主线程上运行一个函数,并将参数传递给这个函数

so now i am trying this:

所以现在我在尝试:

The following is a console c++ project where i perfectly simulated my big project

下面是一个控制台c++项目,我完美地模拟了我的大项目

TestServicePost.cpp

TestServicePost.cpp

#include "stdafx.h"
#include "SomeClass.h"


int _tmain(int argc, _TCHAR* argv[])
{
    SomeClass* s = new SomeClass();
    while(true)
    {
        s->update();
    }
    return 0;
}

SomeClass.h

SomeClass.h

#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <queue>

class ServiceNote
{
public:
    std::string getType()
    {
        std::stringstream typeSS;
        typeSS << "LamasaTech.MultiWall.PostNote." << (NoteType.compare("Normal") == 0 ? "Node" : "Header") << "." << Shape << "." << Colour;
        return typeSS.str();
    }
    int Action; 
    int CNoteId;    
    std::string Colour; 
    int NoteId; 
    std::string NoteType;   
    int SessionId;  
    std::string Shape;  
    std::string Style;  
    std::string Text;   
    int X;  
    int Y;  
};

class SomeClass
{
public:
    SomeClass();
    ~SomeClass();
    void update();

private:
    std::queue<ServiceNote> pendingNotes;
    void addToQueue(ServiceNote sn);
    void pollService(boost::asio::io_service* svc);
    int getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId);
    boost::thread servicePoller;
};

SomeClass.cpp

SomeClass.cpp

#include "stdafx.h"
#include "SomeClass.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/asio/signal_set.hpp>

#define POLL_SERVICE = 0;
#define POLLING_WAIT_TIME 1000
#define SAVE_SESSION_EVERY 1800000

SomeClass::SomeClass()
{
    boost::asio::io_service io_servicePoller;
    io_servicePoller.run();
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, &io_servicePoller));
    /*boost::asio::io_service io_sessionSaver;
    boost::asio::signal_set signalsSaver(io_sessionSaver, SIGINT, SIGTERM);
    signalsSaver.async_wait( boost::bind(&boost::asio::io_service::stop, &io_sessionSaver));
    sessionSaver = boost::thread(&SomeClass::saveSessionEvery, io_sessionSaver);*/
}

SomeClass::~SomeClass()
{
}

void SomeClass::update()
{   
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

void SomeClass::addToQueue(ServiceNote sn)
{
    pendingNotes.push(sn);
}

void SomeClass::pollService(boost::asio::io_service* svc)
{
    int messageId = 1;
    while(true)
    {
        if(boost::this_thread::interruption_enabled() && boost::this_thread::interruption_requested())
            return;
        int currentId = messageId;
        messageId = getMessage(svc, "49", messageId);
        if(currentId == messageId)
            boost::this_thread::sleep(boost::posix_time::milliseconds(POLLING_WAIT_TIME));
    }
}

int SomeClass::getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId)
{
    try
    {
        boost::asio::io_service io_service;

        // Get a list of endpoints corresponding to the server name.
        boost::asio::ip::tcp::resolver resolver(io_service);
        boost::asio::ip::tcp::resolver::query query("mw.rombus.com", "http");
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

        // Try each endpoint until we successfully establish a connection.
        boost::asio::ip::tcp::socket socket(io_service);
        boost::asio::connect(socket, endpoint_iterator);

        // Form the request. We specify the "Connection: close" header so that the
        // server will close the socket after transmitting the response. This will
        // allow us to treat all data up until the EOF as the content.
        boost::asio::streambuf request;
        std::ostream request_stream(&request);
        request_stream << "GET " "/Service.svc/message/" << sessionId << "/" << messageId << " HTTP/1.0\r\n";
        request_stream << "Host: " << "mw.rombus.com" << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        // Send the request.
        boost::asio::write(socket, request);

        // Read the response status line. The response streambuf will automatically
        // grow to accommodate the entire line. The growth may be limited by passing
        // a maximum size to the streambuf constructor.
        boost::asio::streambuf response;
        boost::asio::read_until(socket, response, "\r\n");

        // Check that response is OK.
        std::istream response_stream(&response);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/")
        {
            //std::cout << "Invalid response\n";
            return messageId;
        }
        if (status_code != 200)
        {
            //std::cout << "Response returned with status code " << status_code << "\n";
            return messageId;
        }

        // Read the response headers, which are terminated by a blank line.
        boost::asio::read_until(socket, response, "\r\n\r\n");

        // Process the response headers.
        std::string header;
        std::string fullHeader = "";
        while (std::getline(response_stream, header) && header != "\r")
            fullHeader.append(header).append("\n");

        // Write whatever content we already have to output.
        std::string fullResponse = "";
        if (response.size() > 0)
        {
            std::stringstream ss;
            ss << &response;
            fullResponse = ss.str();
            try
            {
                boost::property_tree::ptree pt;
                boost::property_tree::read_json(ss, pt);
                ServiceNote sn;
                sn.Action =  pt.get<int>("Action");
                sn.CNoteId =  pt.get<int>("CNoteId");
                sn.Colour =  pt.get<std::string>("Colour");
                sn.NoteId =  pt.get<int>("NoteId");
                sn.NoteType =  pt.get<std::string>("NoteType");
                sn.SessionId =  pt.get<int>("SessionId");
                sn.Shape =  pt.get<std::string>("Shape");
                sn.Style =  pt.get<std::string>("Style");
                sn.Text =  pt.get<std::string>("Text");
                sn.X =  pt.get<int>("X");
                sn.Y =  pt.get<int>("Y");
                svc->post(boost::bind(&SomeClass::addToQueue, this, sn));
                //pendingNotes.push(sn);
            }
            catch (std::exception const& e)
            {
                std::string test = e.what();
                //std::cerr << e.what() << std::endl;
            }
            messageId++;
        }

        // Read until EOF, writing data to output as we go.
        std::string fullSth = "";
        boost::system::error_code error;
        while (boost::asio::read(socket, response,
                boost::asio::transfer_at_least(1), error))
        {
            std::ostringstream ss;
            ss << &response;
            fullSth = ss.str();
        }
        if (error != boost::asio::error::eof)
            throw boost::system::system_error(error);
    }
    catch (std::exception& e)
    {
        std::string test = e.what();
        std::cout << "Exception: " << e.what() << "\n";
    }
    return messageId;
}

but i get Unhandled exception at 0x771215de in TestServicePost.exe: 0xC0000005: Access violation writing location 0xcccccce4., right after this line executes:

但是我在TestServicePost中得到了0x771215de的未处理异常。exe: 0xC0000005:访问违反写入位置0xcccce4。,在这条线执行之后:

svc->post(boost::bind(&SomeClass::addToQueue, this, sn));

I couldn't define io_service as a class member so i can use it in the destructor ~SomeClass(), would appreciate help on that too

我不能将io_service定义为一个类成员,因此我可以在析构函数~SomeClass()中使用它,希望您能在这方面给予帮助

If io_service.post is not the best solution for me please recommend something, as you can see i have a constructor, destructor and an update method who is called every tick, i tried using this and the queue alone but it wasn't thread safe, is there an easy thread safe FIFO to use ?

如果io_service。文章不是我的最佳解决方案,请推荐一下,你可以看到我有一个构造函数,析构函数和一个更新的方法叫做每滴答,我尝试使用这个队列孤独但它不是线程安全的,有一个容易使用的线程安全的FIFO吗?

2 个解决方案

#1


4  

In SomeClass constructor you actually do the following:

在一些构造函数中,你实际上做了以下事情:

  1. Define a local io_service instance.
  2. 定义一个本地io_service实例。
  3. Call its run() member-function, which returns immediately, because io_service has no work.
  4. 调用它的run() member-function,它会立即返回,因为io_service没有工作。
  5. Pass an address of the local object to another thread.
  6. 将本地对象的地址传递给另一个线程。

This certainly won't work.

这肯定不会工作。

Note that io_service::run() is a kind of "message loop", so it should block the calling thread. Don't call it in object constructor.

注意,io_service::run()是一种“消息循环”,因此它应该阻塞调用线程。不要在对象构造函数中调用它。

#2


2  

I figured out how to declare io_service as a class member:

我知道了如何将io_service声明为类成员:

boost::shared_ptr< boost::asio::io_service > io_servicePoller;

and in the constructor i did the following:

在构造函数中,我做了如下的事:

SomeClass::SomeClass()
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
    );
    io_servicePoller = io_service;
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller));
}

Some cleanup

一些清理

SomeClass::~SomeClass()
{
    servicePoller.interrupt();
    io_servicePoller->stop();
    servicePoller.join();
}

and in update i called run which adds the stuff into the queue, then reads them in the while loop

在更新中,我调用run将这些内容添加到队列中,然后在while循环中读取它们

void SomeClass::update()
{   
    io_servicePoller->run();
    io_servicePoller->reset();
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

and changed my members signature to void SomeClass::pollService(boost::shared_ptr< boost::asio::io_service > svc)

并将我的成员签名更改为void SomeClass: pollService(boost: shared_ptr< boost: asio::io_service > svc)

So what happens is:

所以会发生什么是:

  1. The app starts
  2. 应用程序启动
  3. inits my class
  4. 初始化我的课
  5. my class makes a service and starts the thread
  6. 我的类创建一个服务并启动线程
  7. the thread fetches items from the service
  8. 线程从服务中获取项
  9. the main thread checks the io service queue and exuted it
  10. 主线程检查io服务队列并发出它。
  11. then it uses the queue
  12. 然后它使用队列

Thanks to Igor R. i couldn't have done it without him

多亏了伊戈尔·r·我没有他就做不到

and also http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4 where i got how to make the shared pointer

同时http://www.gamedev.net/blog/950/entry - 2249317 -指导- - -开始- - - - - - - boostasio?在这里我得到了如何创建共享指针。

#1


4  

In SomeClass constructor you actually do the following:

在一些构造函数中,你实际上做了以下事情:

  1. Define a local io_service instance.
  2. 定义一个本地io_service实例。
  3. Call its run() member-function, which returns immediately, because io_service has no work.
  4. 调用它的run() member-function,它会立即返回,因为io_service没有工作。
  5. Pass an address of the local object to another thread.
  6. 将本地对象的地址传递给另一个线程。

This certainly won't work.

这肯定不会工作。

Note that io_service::run() is a kind of "message loop", so it should block the calling thread. Don't call it in object constructor.

注意,io_service::run()是一种“消息循环”,因此它应该阻塞调用线程。不要在对象构造函数中调用它。

#2


2  

I figured out how to declare io_service as a class member:

我知道了如何将io_service声明为类成员:

boost::shared_ptr< boost::asio::io_service > io_servicePoller;

and in the constructor i did the following:

在构造函数中,我做了如下的事:

SomeClass::SomeClass()
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
    );
    io_servicePoller = io_service;
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller));
}

Some cleanup

一些清理

SomeClass::~SomeClass()
{
    servicePoller.interrupt();
    io_servicePoller->stop();
    servicePoller.join();
}

and in update i called run which adds the stuff into the queue, then reads them in the while loop

在更新中,我调用run将这些内容添加到队列中,然后在while循环中读取它们

void SomeClass::update()
{   
    io_servicePoller->run();
    io_servicePoller->reset();
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

and changed my members signature to void SomeClass::pollService(boost::shared_ptr< boost::asio::io_service > svc)

并将我的成员签名更改为void SomeClass: pollService(boost: shared_ptr< boost: asio::io_service > svc)

So what happens is:

所以会发生什么是:

  1. The app starts
  2. 应用程序启动
  3. inits my class
  4. 初始化我的课
  5. my class makes a service and starts the thread
  6. 我的类创建一个服务并启动线程
  7. the thread fetches items from the service
  8. 线程从服务中获取项
  9. the main thread checks the io service queue and exuted it
  10. 主线程检查io服务队列并发出它。
  11. then it uses the queue
  12. 然后它使用队列

Thanks to Igor R. i couldn't have done it without him

多亏了伊戈尔·r·我没有他就做不到

and also http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4 where i got how to make the shared pointer

同时http://www.gamedev.net/blog/950/entry - 2249317 -指导- - -开始- - - - - - - boostasio?在这里我得到了如何创建共享指针。