处理多线程清理的最佳方法

时间:2021-01-14 20:42:08

I have a server-type application, and I have an issue with making sure thread's aren't deleted before they complete. The code below pretty much represents my server; the cleanup is required to prevent a build up of dead threads in the list.

我有一个服务器类型的应用程序,我有一个问题,确保线程在完成之前不会被删除。下面的代码几乎代表我的服务器;需要清理以防止在列表中建立死线程。

using namespace std;

class A {
public:
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) {
       somethingThread = thread([cleanupFunction, getStopFlag, this]() {
          doSomething(getStopFlag);
          cleanupFunction();
       });

    }
private:
    void doSomething(function<bool()> getStopFlag);
    thread somethingThread;
    ...
}

class B {
public:
    void runServer();

    void stop() {
        stopFlag = true;
        waitForListToBeEmpty();
    }
private:
    void waitForListToBeEmpty() { ... };
    void handleAccept(...) {
        shared_ptr<A> newClient(new A());
        { 
            unique_lock<mutex> lock(listMutex);
            clientData.push_back(newClient);
        }
        newClient.doSomethingThreaded(bind(&B::cleanup, this, newClient), [this]() {
            return stopFlag;
        });
    }

    void cleanup(shared_ptr<A> data) {
        unique_lock<mutex> lock(listMutex);
        clientData.remove(data);
    }

    list<shared_ptr<A>> clientData;
    mutex listMutex;
    atomc<bool> stopFlag;
}

The issue seems to be that the destructors run in the wrong order - i.e. the shared_ptr is destructed at when the thread's function completes, meaning the 'A' object is deleted before thread completion, causing havok when the thread's destructor is called.

问题似乎是析构函数以错误的顺序运行 - 即,在线程的函数完成时,shared_ptr被破坏,这意味着在线程完成之前删除了'A'对象,在调用线程的析构函数时导致havok。

i.e. Call cleanup function All references to this (i.e. an A object) removed, so call destructor (including this thread's destructor) Call this thread's destructor again -- OH NOES!

即调用清理函数删除所有对此的引用(即A对象),因此调用析构函数(包括此线程的析构函数)再次调用此线程的析构函数 - OH NOES!

I've looked at alternatives, such as maintaining a 'to be removed' list which is periodically used to clean the primary list by another thread, or using a time-delayed deletor function for the shared pointers, but both of these seem abit chunky and could have race conditions.

我已经查看了替代方法,例如维护一个'被删除'列表,该列表定期用于通过另一个线程清理主列表,或者使用延时的deletor函数用于共享指针,但这两个看起来都很粗糙并且可能有竞争条件。

Anyone know of a good way to do this? I can't see an easy way of refactoring it to work ok.

有人知道这样做的好方法吗?我看不出一种简单的重构方法可以正常工作。

3 个解决方案

#1


3  

Are the threads joinable or detached? I don't see any detach, which means that destructing the thread object without having joined it is a fatal error. You might try simply detaching it, although this can make a clean shutdown somewhat complex. (Of course, for a lot of servers, there should never be a shutdown anyway.) Otherwise: what I've done in the past is to create a reaper thread; a thread which does nothing but join any outstanding threads, to clean up after them.

线程是可连接还是分离?我没有看到任何分离,这意味着在没有加入它的情况下破坏线程对象是一个致命的错误。你可以尝试简单地拆卸它,虽然这可以使干净的关闭有点复杂。 (当然,对于很多服务器来说,无论如何都不应该关机。)否则:我过去所做的就是创建一个收割者线程;一个除了加入任何未完成的线程之外什么都不做的线程,在它们之后进行清理

I might add that this is a good example of a case where shared_ptr is not appropriate. You want full control over when the delete occurs; if you detach, you can do it in the clean up function (but quite frankly, just using delete this; at the end of the lambda in A::doSomethingThreaded seems more readable); otherwise, you do it after you've joined, in the reaper thread.

我可以补充一点,这是一个很好的例子,说明shared_ptr不合适。您希望完全控制何时发生删除;如果你分离,你可以在清理功能中做到(但坦率地说,只使用删除这个;在A :: doSomethingThreaded中lambda的末尾似乎更具可读性);否则,你在加入后,在收割者的线程中这样做。

EDIT:

For the reaper thread, something like the following should work:

对于收割者线程,类似下面的东西应该工作:

class ReaperQueue
{
    std::deque<A*> myQueue;
    std::mutex myMutex;
    std::conditional_variable myCond;
    A* getOne()
    {
        std::lock<std::mutex> lock( myMutex );
        myCond.wait( lock, [&]( !myQueue.empty() ) );
        A* results = myQueue.front();
        myQueue.pop_front();
        return results;
    }
public:
    void readyToReap( A* finished_thread )
    {
        std::unique_lock<std::mutex> lock( myMutex );
        myQueue.push_back( finished_thread );
        myCond.notify_all();
    }

    void reaperThread()
    {
        for ( ; ; )
        {
            A* mine = getOne();
            mine->somethingThread.join();
            delete mine;
        }
    }
};

(Warning: I've not tested this, and I've tried to use the C++11 functionality. I've only actually implemented it, in the past, using pthreads, so there could be some errors. The basic principles should hold, however.)

(警告:我没有对此进行测试,并且我尝试使用C ++ 11功能。过去,我实际上只使用pthread实现了它,因此可能会出现一些错误。基本原则应该但是请坚持。)

To use, create an instance, then start a thread calling reaperThread on it. In the cleanup of each thread, call readyToReap.

要使用,创建一个实例,然后启动一个调用reaperThread的线程。在清理每个线程时,调用readyToReap。

To support a clean shutdown, you may want to use two queues: you insert each thread into the first, as it is created, and then move it from the first to the second (which would correspond to myQueue, above) in readyToReap. To shut down, you then wait until both queues are empty (not starting any new threads in this interval, of course).

要支持干净关闭,您可能需要使用两个队列:在创建时将每个线程插入第一个线程,然后在readyToReap中将其从第一个线程移动到第二个线程(对应于上面的myQueue)。要关闭,然后等待,直到两个队列都为空(当然不在此间隔中启动任何新线程)。

#2


1  

The issue is that, since you manage A via shared pointers, the this pointer captured by the thread lambda really needs to be a shared pointer rather than a raw pointer to prevent it from becoming dangling. The problem is that there's no easy way to create a shared_ptr from a raw pointer when you don't have an actual shared_ptr as well.

问题在于,由于您通过共享指针管理A,因此线程lambda捕获的this指针实际上需要是一个共享指针,而不是一个原始指针,以防止它变为悬空。问题是当你没有实际的shared_ptr时,没有简单的方法可以从原始指针创建shared_ptr。

One way to get around this is to use shared_from_this:

解决此问题的一种方法是使用shared_from_this:

class A : public enable_shared_from_this<A> {
public:
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) {
       somethingThread = thread([cleanupFunction, getStopFlag, this]() {
          shared_ptr<A> temp = shared_from_this();
          doSomething(getStopFlag);
          cleanupFunction();
       });

this creates an extra shared_ptr to the A object that keeps it alive until the thread finishes.

这会为A对象创建一个额外的shared_ptr,使其保持活动状态,直到线程完成。

Note that you still have the problem with join/detach that James Kanze identified -- Every thread must have either join or detach called on it exactly once before it is destroyed. You can fulfill that requirement by adding a detach call to the thread lambda if you never care about the thread exit value.

请注意,您仍然遇到James Kanze识别的连接/分离问题 - 每个线程在被销毁之前必须要有一次连接或分离。如果您从不关心线程退出值,则可以通过向线程lambda添加分离调用来满足该要求。

You also have potential for problems if doSomethingThreaded is called multiple times on a single A object...

如果在单个A对象上多次调用doSomethingThreaded,您也有可能遇到问题...

#3


0  

For those who are interested, I took abit of both answers given (i.e. James' detach suggestion, and Chris' suggestion about shared_ptr's).

对于那些感兴趣的人,我采取了两个答案(即詹姆斯的分离建议和克里斯关于shared_ptr的建议)。

My resultant code looks like this and seems neater and doesn't cause a crash on shutdown or client disconnect:

我的结果代码看起来像这样,看起来更整洁,不会导致关机或客户端断开连接崩溃:

using namespace std;

使用命名空间std;

class A {
public:
    void doSomething(function<bool()> getStopFlag) {
        ...
    }
private:
    ...
}

class B {
public:
    void runServer();

    void stop() {
        stopFlag = true;
        waitForListToBeEmpty();
    }
private:
    void waitForListToBeEmpty() { ... };
    void handleAccept(...) {
        shared_ptr<A> newClient(new A());
        { 
            unique_lock<mutex> lock(listMutex);
            clientData.push_back(newClient);
        }
        thread clientThread([this, newClient]() { 
            // Capture the shared_ptr until thread over and done with.

            newClient->doSomething([this]() {
                return stopFlag;
            });
            cleanup(newClient);
        });
        // Detach to remove the need to store these threads until their completion.
        clientThread.detach();
    }

    void cleanup(shared_ptr<A> data) {
        unique_lock<mutex> lock(listMutex);
        clientData.remove(data);
    }

    list<shared_ptr<A>> clientData; // Can remove this if you don't 
                                    // need to connect with your clients.
                                    // However, you'd need to make sure this 
                                    // didn't get deallocated before all clients 
                                    // finished as they reference the boolean stopFlag
                                    // OR make it a shared_ptr to an atomic boolean
    mutex listMutex;
    atomc<bool> stopFlag;
}

#1


3  

Are the threads joinable or detached? I don't see any detach, which means that destructing the thread object without having joined it is a fatal error. You might try simply detaching it, although this can make a clean shutdown somewhat complex. (Of course, for a lot of servers, there should never be a shutdown anyway.) Otherwise: what I've done in the past is to create a reaper thread; a thread which does nothing but join any outstanding threads, to clean up after them.

线程是可连接还是分离?我没有看到任何分离,这意味着在没有加入它的情况下破坏线程对象是一个致命的错误。你可以尝试简单地拆卸它,虽然这可以使干净的关闭有点复杂。 (当然,对于很多服务器来说,无论如何都不应该关机。)否则:我过去所做的就是创建一个收割者线程;一个除了加入任何未完成的线程之外什么都不做的线程,在它们之后进行清理

I might add that this is a good example of a case where shared_ptr is not appropriate. You want full control over when the delete occurs; if you detach, you can do it in the clean up function (but quite frankly, just using delete this; at the end of the lambda in A::doSomethingThreaded seems more readable); otherwise, you do it after you've joined, in the reaper thread.

我可以补充一点,这是一个很好的例子,说明shared_ptr不合适。您希望完全控制何时发生删除;如果你分离,你可以在清理功能中做到(但坦率地说,只使用删除这个;在A :: doSomethingThreaded中lambda的末尾似乎更具可读性);否则,你在加入后,在收割者的线程中这样做。

EDIT:

For the reaper thread, something like the following should work:

对于收割者线程,类似下面的东西应该工作:

class ReaperQueue
{
    std::deque<A*> myQueue;
    std::mutex myMutex;
    std::conditional_variable myCond;
    A* getOne()
    {
        std::lock<std::mutex> lock( myMutex );
        myCond.wait( lock, [&]( !myQueue.empty() ) );
        A* results = myQueue.front();
        myQueue.pop_front();
        return results;
    }
public:
    void readyToReap( A* finished_thread )
    {
        std::unique_lock<std::mutex> lock( myMutex );
        myQueue.push_back( finished_thread );
        myCond.notify_all();
    }

    void reaperThread()
    {
        for ( ; ; )
        {
            A* mine = getOne();
            mine->somethingThread.join();
            delete mine;
        }
    }
};

(Warning: I've not tested this, and I've tried to use the C++11 functionality. I've only actually implemented it, in the past, using pthreads, so there could be some errors. The basic principles should hold, however.)

(警告:我没有对此进行测试,并且我尝试使用C ++ 11功能。过去,我实际上只使用pthread实现了它,因此可能会出现一些错误。基本原则应该但是请坚持。)

To use, create an instance, then start a thread calling reaperThread on it. In the cleanup of each thread, call readyToReap.

要使用,创建一个实例,然后启动一个调用reaperThread的线程。在清理每个线程时,调用readyToReap。

To support a clean shutdown, you may want to use two queues: you insert each thread into the first, as it is created, and then move it from the first to the second (which would correspond to myQueue, above) in readyToReap. To shut down, you then wait until both queues are empty (not starting any new threads in this interval, of course).

要支持干净关闭,您可能需要使用两个队列:在创建时将每个线程插入第一个线程,然后在readyToReap中将其从第一个线程移动到第二个线程(对应于上面的myQueue)。要关闭,然后等待,直到两个队列都为空(当然不在此间隔中启动任何新线程)。

#2


1  

The issue is that, since you manage A via shared pointers, the this pointer captured by the thread lambda really needs to be a shared pointer rather than a raw pointer to prevent it from becoming dangling. The problem is that there's no easy way to create a shared_ptr from a raw pointer when you don't have an actual shared_ptr as well.

问题在于,由于您通过共享指针管理A,因此线程lambda捕获的this指针实际上需要是一个共享指针,而不是一个原始指针,以防止它变为悬空。问题是当你没有实际的shared_ptr时,没有简单的方法可以从原始指针创建shared_ptr。

One way to get around this is to use shared_from_this:

解决此问题的一种方法是使用shared_from_this:

class A : public enable_shared_from_this<A> {
public:
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) {
       somethingThread = thread([cleanupFunction, getStopFlag, this]() {
          shared_ptr<A> temp = shared_from_this();
          doSomething(getStopFlag);
          cleanupFunction();
       });

this creates an extra shared_ptr to the A object that keeps it alive until the thread finishes.

这会为A对象创建一个额外的shared_ptr,使其保持活动状态,直到线程完成。

Note that you still have the problem with join/detach that James Kanze identified -- Every thread must have either join or detach called on it exactly once before it is destroyed. You can fulfill that requirement by adding a detach call to the thread lambda if you never care about the thread exit value.

请注意,您仍然遇到James Kanze识别的连接/分离问题 - 每个线程在被销毁之前必须要有一次连接或分离。如果您从不关心线程退出值,则可以通过向线程lambda添加分离调用来满足该要求。

You also have potential for problems if doSomethingThreaded is called multiple times on a single A object...

如果在单个A对象上多次调用doSomethingThreaded,您也有可能遇到问题...

#3


0  

For those who are interested, I took abit of both answers given (i.e. James' detach suggestion, and Chris' suggestion about shared_ptr's).

对于那些感兴趣的人,我采取了两个答案(即詹姆斯的分离建议和克里斯关于shared_ptr的建议)。

My resultant code looks like this and seems neater and doesn't cause a crash on shutdown or client disconnect:

我的结果代码看起来像这样,看起来更整洁,不会导致关机或客户端断开连接崩溃:

using namespace std;

使用命名空间std;

class A {
public:
    void doSomething(function<bool()> getStopFlag) {
        ...
    }
private:
    ...
}

class B {
public:
    void runServer();

    void stop() {
        stopFlag = true;
        waitForListToBeEmpty();
    }
private:
    void waitForListToBeEmpty() { ... };
    void handleAccept(...) {
        shared_ptr<A> newClient(new A());
        { 
            unique_lock<mutex> lock(listMutex);
            clientData.push_back(newClient);
        }
        thread clientThread([this, newClient]() { 
            // Capture the shared_ptr until thread over and done with.

            newClient->doSomething([this]() {
                return stopFlag;
            });
            cleanup(newClient);
        });
        // Detach to remove the need to store these threads until their completion.
        clientThread.detach();
    }

    void cleanup(shared_ptr<A> data) {
        unique_lock<mutex> lock(listMutex);
        clientData.remove(data);
    }

    list<shared_ptr<A>> clientData; // Can remove this if you don't 
                                    // need to connect with your clients.
                                    // However, you'd need to make sure this 
                                    // didn't get deallocated before all clients 
                                    // finished as they reference the boolean stopFlag
                                    // OR make it a shared_ptr to an atomic boolean
    mutex listMutex;
    atomc<bool> stopFlag;
}