c++11中的线程、锁和条件变量

时间:2021-03-13 16:37:49
void func(int i, double d, const string& s)
{
    cout << i << ", " << d << ", " << s << endl;
}

int main()
{
    thread t(func, 1, 12.50, "sample");
    t.join();

    system("pause");
    return 0;
}

上例中,t 是一个线程对象,函数func()运行于该线程中。对join()函数的调用将使调用线程(本例是指主线程)一直处于阻塞状态,直到正在执行的线程t执行结束。如果线程函数返回某个值,该值也将被忽略。该函数可以接收任意数量的参数。

尽管可以向线程函数传递任意数量的参数(指的是func形参的个数可以任意个数),但是所有的参数应当按值传递。如果需要将参数按引用传递,必须将参数用std::ref 或者std::cref进行封装。

void func(int& a)
{
    a++;
}

int main()
{
    int a = 42;
    
    thread t(func, ref(a));
    t.join();

    cout << a << endl;

    system("pause");
    return 0;
}
//输出43,如果不用ref,运行出错,why?
//在thread的构造函数中,线程函数的参数被拷贝(浅拷贝)到线程独立内存中,这样可以被线程对象访问,即使函数形参是引用
//线程构造函数拷贝data,传递给函数的参数是data拷贝的引用,而非数据本身的引用,若用ref封装data,则update函数就会
//接收到data变量的引用,而非data拷贝的引用
struct widget
{
    int a;
    string s;
    widget(int b,string ss):a(b),s(ss){}
};

void update(widget &data)
{
    data.a += 1;
    data.s += "233";
}

int main()
{
    widget data(2, "ljy");
    thread t(update, data);
    t.join();
    cout << data.a << " " << data.s << endl;//2 ljy

    system("pause");
    return 0;
}

 

thread的一些常用函数:位于std::this_thread命名空间中

  • get_id: 返回当前线程的id.
  • yield:在处于等待状态时,可以让调度器先运行其他可用的线程。
  • sleep_for:阻塞当前线程,时间不少于其参数指定的时间
  • sleep_util:在参数指定的时间到达之前,使当前线程一直处于阻塞状态

锁:

mutex: 提供了核心函数 lock() 和 unlock(),以及非阻塞方法的try_lock()方法,一旦互斥量不可用,该方法会立即返回。

recursive_mutex:允许在同一个线程中对一个互斥量的多次请求

mutex g_lock;

void func()
{
    //对互斥量加锁,如果互斥量不可用,便处于阻塞状态
    g_lock.lock();
    cout << "entered thread" << this_thread::get_id() << endl;
    this_thread::sleep_for(chrono::seconds(rand() % 10));
    cout << "leaving thread" << this_thread::get_id() << endl;
    //对互斥量解锁
    g_lock.unlock();
}

int main()
{
    //与rand配合使用,实现真正的随机
    srand((unsigned int)time(0));
    
    thread t1(func);
    thread t2(func);
    thread t3(func);

    t1.join();
    t2.join();
    t3.join();


    system("pause");
    return 0;
}

recursive_mutex允许同一个线程多次获取同一个互斥量。

//实现一个线程安全容器
template<class T>
class container
{
private:
    recursive_mutex _lock;
    vector<T> _elements;
public:
    void add(T element)
    {
        _lock.lock();
        _elements.push_back(element);
        _lock.unlock();
    }

    void addrange(int index, vector<T>vec)
    {
        for (int i = 0; i < index; ++i)
        {
            _lock.lock();
            add(vec[i]);
            _lock.unlock();
        }
    }
    void dump()
    {
        _lock.lock();
        for (auto e : _elements)
            cout << e << endl;
        _lock.unlock();
    }
};

void func(container<int>& cont)
{
    vector<int>vec = { 1,2,3,4,5,6,7 };
    cont.addrange(3, vec);
}

int main()
{
    container<int> cont;

    thread t1(func,ref(cont));
    thread t2(func,ref(cont));
    thread t3(func,ref(cont));

    t1.join();
    t2.join();
    t3.join();

    cont.dump();

    system("pause");
    return 0;
}

显式的加锁和解锁会导致一些问题,比如忘记解锁或者请求加锁的顺序不正确,进而产生死锁。该标准提供了一些类和函数帮助解决此类问题。这些封装类保证了在RAII风格上互斥量使用的一致性,可以在给定的代码范围内自动加锁和解锁。封装类包括:
lock_guard:在构造对象时,它试图去获取互斥量的所有权(通过调用lock()),在析构对象时,自动释放互斥量(通过调用unlock()).这是一个不可复制的类

unique_lock:这个一通用的互斥量封装类,不同于lock_guard,它还支持延迟加锁,时间加锁和递归加锁以及锁所有权的转移和条件变量的使用。这也是一个不可复制的类,但它是可移动类

(unique_lock及lock_guard的具体区别、实现要弄明白)


1 采用RAII手法(对象管理资源)管理mutex的std::lock_guard其功能是在对象构造时将mutex加锁,析构时对mutex解锁,这样一个栈对象保证了在异常情形下mutex可以在lock_guard对象析构被解锁,lock_guard拥有mutex的所有权(mutex已被lock)

explicit lock_guard (mutex_type& m);//必须要传递一个mutex作为构造参数,在构造函数中对mutex上锁  
lock_guard (mutex_type& m, adopt_lock_t tag);//tag=adopt_lock表示mutex已经在之前被上锁,这里lock_guard将拥有mutex的所有权  
lock_guard (const lock_guard&) = delete;//不允许copy constructor  

 2 再来看一个与std::lock_guard功能相似但功能更加灵活的管理mutex的对象 std::unique_lock,unique_lock内部持有mutex的状态:locked,unlocked。unique_lock比lock_guard占用空间和速度慢一些,因为其要维护mutex的状态

 

1 unique_lock() noexcept;    //可以构造一个空的unique_lock对象,此时并不拥有任何mutex

2 explicit unique_lock (mutex_type& m);//拥有mutex,并调用mutex.lock()对其上锁    

3 unique_lock (mutex_type& m, try_to_lock_t tag);//tag=try_lock表示调用mutex.try_lock()尝试加锁

4 unique_lock (mutex_type& m, defer_lock_t tag) noexcept;//tag=defer_lock表示不对mutex加锁,只管理mutex,此时mutex应该是没有加锁的

5 unique_lock (mutex_type& m, adopt_lock_t tag);//tag=adopt_lock表示mutex在此之前已经被上锁,此时unique_locl管理mutex

6 template <class Rep, class Period>
   unique_lock (mutex_type& m, const chrono::duration<Rep,Period>& rel_time);//在一段时间rel_time内尝试对mutex加锁,mutex.try_lock_for(rel_time)

7 template <class Clock, class Duration>
   unique_lock (mutex_type& m, const chrono::time_point<Clock,Duration>& abs_time);//mutex.try_lock_until(abs_time)直到abs_time尝试加锁

8 unique_lock (const unique_lock&) = delete;//禁止拷贝构造

9 unique_lock (unique_lock&& x);//获得x管理的mutex,此后x不再和mutex相关,x此后相当于一个默认构造的unique_lock,移动构造函数,具备移动语义,movable but not copyable

说明:其中2和5拥有mutex的所有权(mutex被lock),而1和4不拥有mutex的所有权,3和6及7若尝试加锁成功则拥有mutex的所有权

unique_lock 在使用上比lock_guard更具有弹性,和 lock_guard 相比,unique_lock 主要的特色在于:

  •          unique_lock 不一定要拥有 mutex,所以可以通过 default constructor 建立出一个空的 unique_lock。
  •          unique_lock 虽然一样不可复制(non-copyable),但是它是可以转移的(movable)。所以,unique_lock 不但可以被函数回传,也可以放到 STL 的 container 里。
  •          另外,unique_lock 也有提供 lock()、unlock() 等函数,可以用来加锁解锁mutex,也算是功能比较完整的地方。
  •          unique_lock本身还可以用于std::lock参数,因为其具备lock、unlock、try_lock成员函数,这些函数不仅完成针对mutex的操作还要更新mutex的状态。

 3  std::unique_lock其它成员函数

~unique_lock();//若unique_lock对象拥有管理的mutex的所有权,mutex没有被销毁或者unlock,那么将执行mutex::unlock()解锁,并不销毁mutex对象。
mutex_type* mutex() const noexcept;//返回unique_lock管理的mutex指针,但是unique_lock不会放弃对mutex的管理,若unique_lock对mutex上锁了,其有义务对mutex解锁
bool owns_lock() const noexcept;//当mutex被unique_lock上锁,且mutex没有解锁或析构,返回真,否则返回false
explicit bool operator () const noexcept;//同上

 4  std::unique_lock增加了灵活性,比如可以对mutex的管理从一个scope通过move语义转到另一个scope,不像lock_guard只能在一个scope中生存。同时也增加了管理的难度,因此如无必要还是用lock_guard。

 5 网上看见一个unique_lock的应用于银行转账的实例,贴在这里:

struct bank_account//银行账户  
{
    explicit bank_account(string name, int money)
    {
        sName = name;
        iMoney = money;
    }

    string sName;
    int iMoney;
    mutex mMutex;//账户都有一个锁mutex  
};
void transfer(bank_account &from, bank_account &to, int amount)//这里缺少一个from==to的条件判断个人觉得  
{
    unique_lock<mutex> lock1(from.mMutex, defer_lock);//defer_lock表示延迟加锁,此处只管理mutex  
    unique_lock<mutex> lock2(to.mMutex, defer_lock);
    lock(lock1, lock2);//lock一次性锁住多个mutex防止deadlock,这个是关键  
    from.iMoney -= amount;
    to.iMoney += amount;
    cout << "Transfer " << amount << " from " << from.sName << " to " << to.sName << endl;
}

void main()
{
    bank_account Account1("User1", 100);
    bank_account Account2("User2", 50);
    thread t1([&]() { transfer(Account1, Account2, 10); });//lambda表达式,注意此处Account1,Account2都是传入引用,值会发生改变  
    thread t2([&]() { transfer(Account2, Account1, 5); });
    t1.join();
    t2.join();

    system("pause");
}

采用lock_guard也可以如下:

lock( from.mMutex, to.mMutex );  
lock_guard<mutex> lock1( from.mMutex, adopt_lock );//adopt_lock表示mutex已经上锁,lock1将拥有from.mMutex  
lock_guard<mutex> lock2( to.mMutex, adopt_lock );  

 

 


 

条件变量:它能使一个或多个线程进入阻塞状态(线程调用wait方法),直到接到另一个线程的通知,或者发生超时或虚假唤醒时,才退出阻塞

condition_variable:要求任何在等待该条件变量的线程必须先获取std::unique_lock锁

条件变量的工作原理:

至少有一个线程在等待某个条件(该条件与条件变量无关)变为true,等待的线程必须先获取unique_lock 锁。该锁被传递给wait()方法,wait()方法会释放互斥量,并将线程挂起,直到条件变量接收到信号。收到信号后,线程会被唤醒,同时该锁也会被重新获取

mutex m;
condition_variable cond;
int flag = 0;
void producer() {
    this_thread::sleep_for(chrono::seconds(1));
    lock_guard<mutex> guard(m);
    flag = 100;
    cond.notify_one();
    cout << "notify..." << endl;
}
void customer() {
    unique_lock<mutex> lk(m);
    if (m.try_lock())
        cout << "mutex unlocked after unique_lock" << endl;
    else
        cout << "mutex locked after unique_lock" << endl;//输出  
    while (flag == 0) {
        cout << "wait..." << endl;
        cond.wait(lk);
    }
    if (m.try_lock())
        cout << "mutex unlocked after wait" << endl;
    else
        cout << "mutex locked after wait" << endl;//输出  
    cout << "flag==100? " << flag << endl;
}
/*
mutex locked after unique_lock
wait...
notify...
mutex locked after wait
flag==100? 100
*/
int main() {
    thread one(producer);
    thread two(customer);
    one.join();
    two.join();

    system("pause");
    return 0;
}

可以使用notify_one()来发送信号,唤醒一个正在等待该条件收到信号的处于阻塞状态的线程,或者用notify_all()来唤醒在等待该条件的所有线程。

在多处理器系统中,因为一些复杂情况,要想完全预测到条件被唤醒并不容易,还会出现虚假唤醒的情况。就是说,在没人给条件变量发送信号的情况下,线程也可能会被唤醒。所以线程被唤醒后,还需要检测条件是否为true(在while循环中调用wait)。因为可能会多次发生虚假唤醒,所以需要进行循环检测。

wait方法带有锁unique_lock,这个方法可以释放锁,阻塞线程,并把线程添加到正在等待这一条件变量的线程队列里面。当该条件变量收到信号或者发生虚假唤醒时,线程就会被唤醒。它们其中任何一个发生时,锁都会被重新获取

条件变量是非常底层的同步原语,很少直接使用,一般都是用它来实现高层的同步措施,如BlockingQueue或CountDownLatch

阻塞队列实现:当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

template<typename T>
class BlockingQueue
{
public:
    BlockingQueue(){}
//lock_guard保证对队列的互斥访问,condition_variable唤醒阻塞线程,实现线程同步,Put操作应该也要有一个condition_variable
//我的理解:put也用unique_lock,在while中判断队列是否满了,若满了,调用condition_variable.wait阻塞自己
void Put(const T& task) { //临界区 { std::lock_guard<std::mutex> lock(_mutex); _queue.push_back(task); } _condvar.notify_all(); } T Take() { std::unique_lock<std::mutex> lock(_mutex);
     //此处应该在while循环中调用,防止虚假唤醒 _condvar.wait(
lock, [this] {return !_queue.empty(); }); assert(!_queue.empty()); T front(_queue.front()); _queue.pop_front(); return front; } size_t Size() const { std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); } private: BlockingQueue(const BlockingQueue& rhs); BlockingQueue& operator = (const BlockingQueue& rhs); private: mutable std::mutex _mutex; std::condition_variable _condvar; std::list<T> _queue;//双向链表 }; int main() { BlockingQueue<int> q; auto t1 = std::async(std::launch::async, [&q]() { for (int i = 0; i < 10; ++i) { q.Put(i); } }); auto t2 = std::async(std::launch::async, [&q]() { while (q.Size()) { std::cout <<"t2 "<< q.Take() << std::endl; } }); auto t3 = std::async(std::launch::async, [&q]() { while (q.Size()) { std::cout <<"t3 "<< q.Take() << std::endl; } }); t1.wait(); t2.wait(); t3.wait(); system("pause"); return 0; }

转自:http://blog.csdn.net/cywosp/article/details/9157379


CountDownLatch:

 


 

用C++11的std::async代替线程的创建:

线程是属于比较低层次的东西,有时候使用有些不便,比如我希望获取线程函数的返回结果的时候,我就不能直接通过thread.join()得到结果,这时就必须定义一个变量,在线程函数中去给这个变量赋值,然后join,最后得到结果,这个过程是比较繁琐的。

c++11还提供了异步接口std::async,通过这个异步接口可以很方便的获取线程函数的执行结果。std::async会自动创建一个线程去调用线程函数,它返回一个std::future,这个future中存储了线程函数返回的结果,当我们需要线程函数的结果时,直接从future中获取,非常方便。但是我想说的是,其实std::async给我们提供的便利可不仅仅是这一点,它首先解耦了线程的创建和执行,使得我们可以在需要的时候获取异步操作的结果;其次它还提供了多个线程创建策略(比如可以通过延迟加载的方式去创建线程),使得我们可以以多种方式去创建线程。在介绍async具体用法以及为什么要用std::async代替线程的创建之前,我想先说一说std::future、std::promise和std::packaged_task。

std::future是一个非常有用也很有意思的东西,简单说std::future提供了一种访问异步操作结果的机制。从字面意思来理解,它表示未来,我觉得这个名字非常贴切,因为一个异步操作我们是不可能马上就获取操作结果的,只能在未来某个时候获取,但是我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有三种状态

  • deferred:异步操作还没开始
  • ready:异步操作已经完成
  • timeout:异步操作超时

获取future结果有三种方式:get、wait、wait_for,其中get等待异步操作结束并返回结果,wait只是等待异步操作完成,没有返回值,wait_for是超时等待返回结果。

std::promise为获取线程函数中的某个值提供便利在线程函数中给外面传进来的promise赋值,当线程函数执行完成之后就可以通过promis获取该值了,值得注意的是取值是间接通过promise内部提供的future来获取的。它的基本用法:

    std::promise<int> pr;
    std::thread t([](std::promise<int>& p) { p.set_value_at_thread_exit(9); }, std::ref(pr));
    std::future<int> f = pr.get_future();
    auto r = f.get();
    cout << r << endl;
    t.join();

std::packaged_task它封装了一个可调用的目标(如function, lambda expression, bind expression, or another function object),以便异步调用,它和promise在某种程度上有点像,promise保存了一个共享状态的值,而packaged_task保存的是一个函数。它的基本用法:

    std::packaged_task<int()> task([](){ return 7; });
    std::thread t1(std::ref(task)); 
    std::future<int> f1 = task.get_future(); 
    auto r1 = f1.get();

至此, 我们介绍了std::async相关的几个对象std::future、std::promise和std::packaged_task,其中std::promise和std::packaged_task的结果最终都是通过其内部的future返回出来的,不知道读者有没有搞糊涂,为什么有这么多东西出来,他们之间的关系到底是怎样的?且听我慢慢道来,std::future提供了一个访问异步操作结果的机制,它和线程是一个级别的属于低层次的对象,在它之上高一层的是std::packaged_task和std::promise,他们内部都有future以便访问异步操作结果,std::packaged_task包装的是一个异步操作,而std::promise包装的是一个值,都是为了方便异步操作的,因为有时我需要获取线程函数中的某个值,这时就用std::promise,而有时我需要获取一个异步操作的返回值,这时就用std::packaged_task(我的理解:packaged_task就是这个异步操作)。那std::promise和std::packaged_task之间又是什么关系呢?说他们没关系也关系,说他们有关系也有关系,都取决于你了,因为我可以将一个异步操作的结果保存到std::promise中。

std::async先将异步操作用std::packaged_task包装起来(我的理解是将线程函数包装在packaged_task中),然后将异步操作的结果放到std::promise中,这个过程就是创造未来的过程。外面再通过future.get/wait来获取这个未来的结果,怎么样,std::async真的是来帮忙的吧,你不用再想到底该怎么用std::future、std::promise和std::packaged_task了,std::async已经帮你搞定一切了!

  现在来看看std::async的原型async(std::launch::async | std::launch::deferred, f, args...),第一个参数是线程的创建策略,有两种策略,默认的策略是立即创建线程

  • std::launch::async:在调用async就开始创建线程。
  • std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程

第二个参数是线程函数,第三个参数是线程函数的参数。

int main()
{
    std::future<int> f1 = std::async(std::launch::async, []() {
        return 8;
    });

    cout << f1.get() << endl; //output: 8

    std::future<void> f2 = std::async(std::launch::async, []() {
        cout << 8 << endl;
    });

    f2.wait(); //output: 8

    std::future<int> future = std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::seconds(3));
        return 8;
    });

    std::cout << "waiting...\n";
    std::future_status status;
    do {
        status = future.wait_for(std::chrono::seconds(1));
        if (status == std::future_status::deferred) {
            std::cout << "deferred\n";
        }
        else if (status == std::future_status::timeout) {
            std::cout << "timeout\n";
        }
        else if (status == std::future_status::ready) {
            std::cout << "ready!\n";
        }
    } while (status != std::future_status::ready);

    std::cout << "result is " << future.get() << '\n';

    system("pause");
    return 0;
}

总结:

  std::async是更高层次上的异步操作,使我们不用关注线程创建内部细节,就能方便的获取异步执行状态和结果,还可以指定线程创建策略,应该用std::async替代线程的创建,让它成为我们做异步操作的首选

转自:http://www.cnblogs.com/qicosmos/p/3534211.html


 

lambda基本语法:

简单来说,Lambda函数也就是一个函数,它的语法定义如下:

[capture](parameters) mutable ->return-type{statement}
  1. [capture]:捕捉列表。捕捉列表总是出现在Lambda函数的开始处。实际上,[]是Lambda引出符。编译器根据该引出符判断接下来的代码是否是Lambda函数。捕捉列表能够捕捉上下文中的变量以供Lambda函数使用;
  2. (parameters):参数列表。与普通函数的参数列表一致。如果不需要参数传递,则可以连同括号“()”一起省略;
  3. mutable:mutable修饰符。默认情况下,Lambda函数总是一个const函数,mutable可以取消其常量性在使用该修饰符时,参数列表不可省略(即使参数为空);
  4. ->return-type:返回类型。用追踪返回类型形式声明函数的返回类型。我们可以在不需要返回值的时候也可以连同符号”->”一起省略。此外,在返回类型明确的情况下,也可以省略该部分,让编译器对返回类型进行推导;
  5. {statement}:函数体。内容与普通函数一样,不过除了可以使用参数之外,还可以使用所有捕获的变量。

与普通函数最大的区别是,除了可以使用参数以外,Lambda函数还可以通过捕获列表访问一些上下文中的数据。具体地,捕捉列表描述了上下文中哪些数据可以被Lambda使用,以及使用方式(以值传递的方式或引用传递的方式)。语法上,在“[]”包括起来的是捕捉列表,捕捉列表由多个捕捉项组成,并以逗号分隔。捕捉列表有以下几种形式:

  1. [var]表示值传递方式捕捉变量var;
  2. [=]表示值传递方式捕捉所有父作用域的变量(包括this);
  3. [&var]表示引用传递捕捉变量var;
  4. [&]表示引用传递方式捕捉所有父作用域的变量(包括this);
  5. [this]表示值传递方式捕捉当前的this指针。

上面提到了一个父作用域,也就是包含Lambda函数的语句块,说通俗点就是包含Lambda的“{}”代码块。上面的捕捉列表还可以进行组合,例如:

  1. [=,&a,&b]表示以引用传递的方式捕捉变量a和b,以值传递方式捕捉其它所有变量;
  2. [&,a,this]表示以值传递的方式捕捉变量a和this,引用传递方式捕捉其它所有变量。

不过值得注意的是,捕捉列表不允许变量重复传递。下面一些例子就是典型的重复,会导致编译时期的错误。例如:

  1. [=,a]这里已经以值传递方式捕捉了所有变量,但是重复捕捉a了,会报错的;
  2. [&,&this]这里&已经以引用传递方式捕捉了所有变量,再捕捉this也是一种重复。
int a = 1;
int b = 2;

auto func = [=, &b](int c)->int {return b += a + c; };
cout<<func(a);

 

 


 

读写锁:http://blog.csdn.net/inszva/article/details/51571315

虚假唤醒:

即使没有线程调用condition_signal, 原先调用condition_wait的函数也可能会返回。此时线程被唤醒了,但是条件并不满足,这个时候如果不对条件进行检查而往下执行,就可能会导致后续的处理出现错误。 

解决措施:把判断bool条件和wait()放到while循环中

 

http://blog.jobbole.com/44409/

http://blog.csdn.net/column/details/ccia.html

http://www.cnblogs.com/haippy/p/3346477.html