c++并发与多线程

时间:2021-03-03 01:16:24

c++并发与多线程

子线程结束,主线程不能结束,否则会出错,和java不一样。

可以用join的方式让主线程等待子线程执行结束。

quickStart

线程相关头文件 #include <thread>

使用全局函数构造一个线程对象

#include <iostream>
#include <thread>
#include <fstream>

void myPrint() {
    ofstream ofs;
    ofs.open("data.txt", ios_base::out);

    for (size_t i = 0; !flag; i++)
    {
        ofs << "new thread " <<i <<endl;
    }

    ofs.close(); 
    
}

int main()
{

    thread thread1(myPrint);
    thread1.join();//主线程等待thread1执行结束
    for (size_t i = 0; i < 100; i++)
	{
		cout << "main thread " << i << endl;
	}
}

这里用join让主线程等待子线程结束,再执行后面的代码。

创建thread对象后,线程即可启动。

线程的执行方式join和detach

join

thread thread1(myPrint);
thread1.join();//主线程等待thread1执行结束

调用子线程join方法后,父线程会阻塞,等待子线程执行结束后再执行。

detach

thread1.detach();

调用子线程detach方法后,子线程会和父线程分离,子线程父线程同时进行,主线程可以不用等待其他线程结束就可以正常结束。

ℹ️需要注意的问题:

  1. 当子线程中有主线程对象的引用时,如果主线程先于子线程结束,这些引用的对象就会被释放,导致子线程中的引用出错
  2. 主线程中用来构造子线程的对象(对象A),会被复制到子线程中,因此,主线程结束释放对象A后不会影响子线程的执行

线程的构造方式

全局函数

#include <iostream>
#include <thread>
#include <fstream>

void myPrint() {
    ofstream ofs;
    ofs.open("data.txt", ios_base::out);

    for (size_t i = 0; !flag; i++)
    {
        ofs << "new thread " <<i <<endl;
    }

    ofs.close(); 
    
}

int main()
{

    thread thread1(myPrint);
    thread1.join();//主线程等待thread1执行结束
    for (size_t i = 0; i < 100; i++)
	{
		cout << "main thread " << i << endl;
	}
}

对象

class ThreadJob 
{
    int count;

public:
    
    ThreadJob(int n):count(n) {

        cout << "MyThread()" << endl;
    }

    ThreadJob(const ThreadJob& mt):count(mt.count) {
        cout << "MyThread(const MyThread&)" << endl;
    }

    //重载括号运算符
    void operator()() 
    {
        cout << "MyThread work" << endl;
    }
};

int main()
{


    //构造方式2:使用可执行对象构造线程
    //ThreadJob 对象会被复制到线程中去,因此主线程结束后myThread对象被释放不会影响线程thread2的执行
    ThreadJob job(5);
    thread thread2(job);
    if (thread2.joinable())
    {
        thread2.join();
    }

	for (size_t i = 0; i < 100; i++)
	{
		cout << "main thread " << i << endl;
	}

    return 0;
}

lambda表达式

int main()
{
	/*for (size_t i = 0; i < 100; i++)
	{
		cout << "main thread " << i << endl;
	}*/
  
    //构造方式3:lambda
    auto myLamdaThreadJob=[]{
        cout << "labmda" << endl;
    };

    thread thread3(myLamdaThreadJob);
    thread3.join();

    flag = true;

    return 0;
}

c++中lambda表达式的内容可以参考

https://docs.microsoft.com/zh-cn/cpp/cpp/lambda-expressions-in-cpp?view=msvc-160

std::this_thread

指代当前线程对象

线程ID

使用下面方法可以获取线程id

std::this_thread::get_id()

线程休眠

std::chrono::milliseconds dura(200);
std::this_thread::sleep_for(dura);

线程传递参数

传递引用

示例

使用函数指针构造线程,函数传递两个参数

class ThreadJob 
{
    int count;

public:
    
    ThreadJob(int n):count(n) {

        cout << "ThreadJob() threadId="<<this_thread::get_id() << endl;
    }

    ThreadJob(const ThreadJob& mt):count(mt.count) {
        cout << "ThreadJob(const ThreadJob&) threadId=" << this_thread::get_id() << endl;
    }

    ~ThreadJob()
    {
        cout << "~ThreadJob() threadId=" << this_thread::get_id() << this<< endl;
    }

};


void myPrint2( int i, const ThreadJob& job) {
    cout << &job << endl;
}
int main()
{

    cout << "main threadId=" << this_thread::get_id() << endl;

    ThreadJob job(10);

    thread thread1(myPrint2,10,ref(job));

    thread1.join();

    return 0;
}

注意:

void myPrint2( int i, const ThreadJob& job) {
    cout << &job << endl;
}

这个函数中的引用参数必须得用const修饰。

  • 构造线程时,如果参数会发生隐式转换,则隐式转化发生在子线程中
 thread thread1(myPrint2,10,20); //这里把20使用ThreadJob(int)隐式转换成ThreadJob对象
  • 这会导致在主线程释放了局部变量后,子线程中的隐式转换还没有完成,从而出错。
  • 可以使用匿名对象的方式显示构造对象
thread thread1(myPrint2,10,ThreadJob(20));//匿名对象的构建发生在主线程中
  • 匿名对象构建完成之后,不管线程回调函数参数中是否是引用,都会将主线程中传入的参数对象复制一份给子线程。如果回调函数参数不是引用,则对象复制会发生两次
  • 可以使用std::ref(obj)或者有些情况下也可以用&obj方式传递主线程对象的真正引用,
 thread thread1(myPrint2,10,std::ref(job));

传递智能指针

示例:传递独占式指针

void job1(unique_ptr<int> i) {

}
int main()
{
    unique_ptr<int> i1(new int(100));
    thread thread4(job1,std::move(i1));
    thread4.join();
}

传递成员函数

示例

class ThreadJob 
{
    int count;

public:
    
    ThreadJob(int n):count(n) {
		//....
    }

    void work(int n) {
        //.....
    }
};


int main()
{
    ThreadJob job(10);
    thread thread2(&ThreadJob::work,job,10); //job对象会复制一份到子线程中
    thread2.join();
}

加锁

互斥量(mutex)、lock、unlock

mutex.lock和mutex.unlock必须成对使用

mutex需要包含头文件

#include <mutex>

示例代码:

#include <iostream>
#include <mutex>
#include<list>

using namespace std;

class MsgProcessor
{

public:
void MsgProcessor::procMsg()
{
	for (size_t i = 0; i < 10000; i++)
	{
		m_mutex.lock();
		if (!msgQueue.empty())
		{
			int msg = msgQueue.front();
			cout << "process msg:" << i << endl;
			msgQueue.pop_front();
		}
		else
		{
			cout << "start msgProc() but list is empty" << endl;
		}
		m_mutex.unlock();
		
	}
}
void MsgProcessor::receiveMsg()
{
	for (size_t i = 0; i < 1000; i++)
	{
		cout << "receive msg:" << i << endl;
        
		m_mutex.lock(); //加锁
		msgQueue.push_back(i);
		m_mutex.unlock();//解锁

	}
}
private:
	std::list<int> msgQueue;
    
    //互斥量
	std::mutex m_mutex;
};

int main()
{

    MsgProcessor msgProcessor;

    thread threadRecMsg(&MsgProcessor::receiveMsg, &msgProcessor);
    thread threadProcMsg1(&MsgProcessor::procMsg, &msgProcessor);
    thread threadProcMsg2(&MsgProcessor::procMsg, &msgProcessor);
    
    threadRecMsg.join();
    threadProcMsg1.join();
    threadProcMsg2.join();
    return 0;
}

std::lock_guard类模板

使用lock_guard模板类对象可以自动加锁和释放锁,

加锁范围为从对象声明构造开始到作用范围结束后对象析构

void MsgProcessor::procMsg()
{
	for (size_t i = 0; i < 10000; i++)
	{
		//lock_guard 对象在构造时会调用mutex对象的lock方法,析构时会调用mutex的unlock方法
		std::lock_guard<std::mutex> locker(m_mutex);

		//.....
	
	}
}

std::lock函数模板

  • std::lock函数模板可以一次给两个或者两个以上的互斥量
  • 不存在因为锁的顺序导致死锁的问题
    • 原因:等待所有互斥量全都锁住才能完成上锁
  • 加锁之后需要手动释放锁

示例:

std::lock(m_mutex, m_mutex2);
if (!msgQueue.empty())
{
    int msg = msgQueue.front();
    cout << "process msg:" << i << endl;
    msgQueue.pop_front();
}
m_mutex.unlock();
m_mutex2.unlock();

使用std::lock之后需要手动释放锁,可以使用lock_guard的特性实现自动释放锁。

std::lock(m_mutex, m_mutex2);

std::lock_guard<mutex> guard1(m_mutex, std::adopt_lock);
//使用lock_guard(mutex)会默认调用mutex的lock方法加锁,而std::lock方法中已经给互斥量加过锁,
//因此,这里必须在构造时传入std::adopt_lock这个参数
std::lock_guard<mutex> guard2(m_mutex2, std::adopt_lock);

if (!msgQueue.empty())
{
    int msg = msgQueue.front();
    cout << "process msg:" << i << endl;
    msgQueue.pop_front();
}

unique_lock类模板

  • lock_guard简化了mutexlockunlock管理

  • unique_locklock_guard更灵活,但是效率较低

  • unique_lock可以完全取代lock_guard

std::unique_lock<std::mutex> lock1(m_mutex);

if (!msgQueue.empty())
{
    int msg = msgQueue.front();
    cout << "process msg:" << i << endl;
    msgQueue.pop_front();
}

构造函数

  • std::adopte_lock,表示互斥量已经被加锁
//std::adopte_lock
m_mutex.lock();
std::unique_lock<std::mutex> lock1(m_mutex,std::adopt_lock);


  • std::try_to_lock
    • 尝试去锁定mutex,如果没有锁定成功,则会立即返回,并不会产生阻塞
    • 使用前互斥量不能加锁
std::unique_lock<mutex> lock1(m_mutex,std::try_to_lock);
if (lock1.owns_lock())
{
    cout << "receive msg:" << endl;
}
else
{
    cout << "接受消息线程没有拿到消息队列锁,跳过" << endl;
}
  • std::defer_lock
    • 使用时互斥量不能加锁,否则会报异常
    • 这个参数表示初始化一个没有加锁的互斥量
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();

重要成员函数

lock

  • 使用lock()加锁之后可以自动释放锁
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();

unlock()

  • 使用unlock()释放锁
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
//...
lock1.unlock();
//...
lock1.lock();
//...
lock1.unlock();

try_lock()

  • 返回值
    • true: 拿到锁
    • false: 没有拿到锁
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);

if (lock1.try_lock()) 
{
    //拿到锁
}
else
{
    //没有拿到锁
}

release()

返回unique_lock对象所管理mutex对象的指针,并释放所有权

std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
std::mutex *mutex_ptr = lock1.release();
mutex_ptr->unlock();

unique_lock所有权的传递

  • 通常unique_lock需要管理一个 mutex对象
  • 所有权可以传递,但是不能复制
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
//所有权的传递
std::unique_lock<std::mutex> lock2(std::move(lock1));

std::call_once()

  • std::call_once是c++11引入的函数,能够保证传入的函数只被调用一次,比使用mutex消耗的资源更少
  • std::call_once需要和 std::once_flag结合使用。

单例模式示例

实现一个日志管理器LogManager

LogManager.h

#pragma once
#include <mutex>
using namespace std;
class LogManager
{
private:
	static LogManager* manager;
	static mutex *lock1;

	LogManager();

    //用于释放单例对象
	class GC 
	{
	public:
		~GC();
	};
public:
	static LogManager* getInstance();
	void test();
	
};

LogManager.cpp

#include "LogManager.h"
#include <iostream>

LogManager::LogManager()
{
}

LogManager* LogManager::getInstance()
{

    //这里使用两段锁的方式来进行单例对象的初始化
    if (manager==NULL)
    {
        LogManager::lock1->lock();
        if (manager==NULL)
		{
			manager = new LogManager();
			static GC gc;

        }
        LogManager::lock1->unlock();
    }
    return manager;
}

void LogManager::test()
{
    std::cout << "test" << std::endl;
}

LogManager::GC::~GC() {
    if (manager)
    {
        delete manager;
        manager = NULL;
    }
}

//类的静态成员变量要在类声明的外部初始化
LogManager* LogManager::manager = NULL;
mutex* LogManager::lock1 = new mutex();

main.cpp

#include <iostream>
#include "LogManager.h"

int main()
{
    std::cout << "Hello World!\n";
    LogManager* manager1 = LogManager::getInstance();
    LogManager* manager2 = LogManager::getInstance();
    manager1->test();
}

call_once在单例模式中的应用

LogManager.h

#pragma once
#include <mutex>
using namespace std;
class LogManager
{
private:
	static LogManager* manager;
	//static mutex *lock1;
    
    //std::call_once需要的标记
	static std::once_flag* flag;

	LogManager();
	static void createInstance();

	class GC 
	{
	public:
		~GC();
	};
public:
	static LogManager* getInstance();
	void test();
	
};

LogManager.cpp

#include "LogManager.h"
#include <iostream>


LogManager::LogManager()
{
}

void LogManager::createInstance()
{
	manager = new LogManager();
	static GC gc;
}

LogManager* LogManager::getInstance()
{

	std::call_once(*flag, createInstance);
    return manager;
}

void LogManager::test()
{
    std::cout << "test" << std::endl;
}

LogManager::GC::~GC() {
    if (manager)
    {
        delete manager;
        manager = NULL;
    }
}
LogManager* LogManager::manager = NULL;
mutex* LogManager::lock1 = new mutex();
std::once_flag* LogManager::flag = new std::once_flag();

std::condition_variable、wait、notify

  • condition_variable.wait()方法可以让一个线程等待
  • condition_variable.notify_one()和notify_all()可以唤醒等待的线程

示例

//定义条件变量
std::condition_variable cv;

//使用wait
std::unique_lock<std::mutex> lock(m_mutex);

cv.wait(lock, [this] {
    if (msgQueue.empty())
    {
        cout << "消息队列为空,处理线程等待" << endl;
        return false;
    }

    return true;
});


//使用notify
std::unique_lock<mutex> lock1(m_mutex,std::try_to_lock);
if (lock1.owns_lock())
{
    cout << "receive msg:" << i << endl;
    msgQueue.push_back(i);
    //唤醒正在wait的线程
    cv.notify_all();
}
  • std::conditioan_variable.wait()第二个参数lambda表达式返回bool
    • true,wait 直接返回,程序继续执行
    • false, wait 将解锁互斥量,并且让线程等待被唤醒
  • condition.wait()没有第二个参数,和第二个参数直接返回false效果一样,wait将解锁互斥量,并让线程等待
  • wait中的线程可以通过 notify_one()唤醒,唤醒后会首先尝试继续获取互斥量的的锁
  • 获取到锁之后,如果wait时有第二个参数,则会继续判断第二个参数的返回值,
    • false重新释放锁并等待,true继续执行后续语句

notify_one和notify_all

  • notify_one 可以从等待中的线程中唤醒一个
  • notify_all可以将所有等待中的线程唤醒

std::async,std::future

  • std::async 是个函数模板,用来启动一个异步任务,启动起来的异步任务会返回一个 std::future对象,

    • 异步任务可以通过std::future对象返回一个结果,使用future.get()获取结果
    • 也可以使用future.wait等待线程结束,不取得结果
    • 如果不显式调用 future.get或者 future.wait,程序结束时也会等待线程结束
  • 需要的头文件 future

    示例

int sum(int* data, int n) {
    cout << "thread id=" << this_thread::get_id() << " start work" << endl;
    int result = 0;
    for (size_t i = 0; i < n; i++)
    {
        result += data[i];
    }
    cout << "thread id=" << this_thread::get_id() << " finish work" << endl;
    return result;
}


const int length = 10;

int main()
{
    srand((unsigned)time(0));

    int* data;
    data = new int[length];

    for (size_t i = 0; i < length; i++)
    {
        data[i] = rand() % 100;
    }

    future<int> Future1 = async(sum, data, length);//线程立即开始执行
    future<int> Future2 = async(sum, data, length);


    //get只能调用一次
    int sum1 = Future1.get();

    //wait等待线程执行结束,不拿到返回结果
    Future2.wait();


}

线程控制参数

future<int> Future1 = async(std::launch::deferred, sum, data, length);
future<int> Future2 = async(std::launch::async, sum, data, length);
  • 使用std::async()是可以传入 std::launch枚举参数
    • std::launch::deferred
      • 线程入口函数调用被延迟到 future的wait或者 get方法调用时才执行
      • 两个方法都没有调用时,线程不执行
      • 这种情况下,代码实际上是在调用线程中执行的
    • std::launch::async
      • 强制创建新的线程(async并不是所有情况下都创建新的线程)
    • launch::deferred和launch::async可以同时使用
    • 不传入参数时,默认为 async|deffered,由系统决定是否创建新的线程
std::async(std::launch::async|std::launch::deferred, sum, data, length);

std::future

  • std::future的get函数会进行结果的转移,所以只能使用一次,如果需要多次获取future的结果,可以使用shared_future

  • future_status

future.wait_for()可以获取线程执行状态,返回值是 std::future_status

enum class future_status { // names for timed wait function returns
    ready,
    timeout,
    deferred
};

示例:

future<int> future2 = async(std::launch::async, sum, data, length);

//future_status

//这里等待2000ms获取future的状态
std::future_status status = future2.wait_for(chrono::milliseconds(2000));
if (status==std::future_status::timeout)
{
    cout << "timeout";
} 
else if (status == std::future_status::ready)
{
    future1.get();
    cout << "ready";
}
else if (status == std::future_status::deferred)
{
    cout << "deferred";
}

cout << endl;
  • future.valid()
    • 判断future是否有效,一个future被调用过get方法或者自身被构造为shared_future之后,会发生持有变量的转移,导致valid返回false(0)
future<int> future2 = async(std::launch::async, sum, data, length);
cout << "future2.valid()=" << future2.valid() << endl;
std::shared_future<int> shared_future1(future2.share());
cout <<"future2.valid()=" << future2.valid() << endl;

std::shared_future

  • 共享future,get方法会把线程返回的结果复制并返回,因此可以多次调用get方法
future<int> future2 = async(std::launch::async, sum, data, length);

//构造shared_future
//方式1
std::shared_future<int> shared_future1(std::move(future2));
//方式2
std::shared_future<int> shared_future1(future2.share());

//多次调用不会报错
shared_future1.get();
shared_future1.get();
shared_future1.get();
  • 可以直接使用future来构造shared_future
//1
std::packaged_task<int(int*, int)> pt(sum);
std::shared_future<int> sfuture1(pt.get_future());

//2
std::shared_future<int> sfuture2(std::async(sum,data,length));

async和thread的区别

  • async创建异步任务时,有的时候并不创建新的线程
  • thread一定会创建新的线程,创建失败程序会出错
  • async在没有参数指定必须创建新的线程,无法创建新的线程时,就不会创建新的线程,而是在调用它的线程上运行

std::packaged_task

  • std::packaged_task是一个类模板,模板参数是各种可调用对象
  • 通过std::packaged_task可以把各种可调用对象包装起来,方便将来作为线程入口参数
  • 也可以直接调用,但是这种情况下没有新的线程
  • 头文件 <future>
//创建 packaged_task对象
std::packaged_task<int(int*,int)> m_task(sum);

//===使用packageed_task创建线程==
std::thread t1(std::ref(m_task), data,length);
t1.join(); 
std::future<int> res = m_task.get_future(); //前面的t1已经开始执行,主线程不会等待子线程结束后再结束,所以必须先join再获取结果

//==直接调用packaged_task对象==
m_task(data, length);
future<int> res2 = m_task.get_future();
res2.get();

std::promise

  • 类模板,能够在某个线程中给它赋值,在其他线程中取值
#include <iostream>
#include <thread>
#include <future>

using namespace std;

void myThread(std::promise<int>& tmp, int param1) 
{
    chrono::milliseconds dura(1000);
    this_thread::sleep_for(dura);
    param1++;
    tmp.set_value(param1); //给promise对象赋值
    return;

}

int main()
{
    std::promise<int> Promise;
    thread t1(myThread, std::ref(Promise), 10);
    t1.join();
    future<int> Future = Promise.get_future(); //从promise对象获取future,只能操作一次
    auto result = Future.get();
    cout << result << endl;
}

原子操作

原子操作示例

一些操作,例如加法,在计算机内部执行时会被分解成很多步骤,如果在执行时发生线程切换,会导致一次加法没有做完就切换到其他线程

#include <iostream>
#include <thread>

using namespace std;
int m_count = 0;

void add_self()
{
	for (size_t i = 0; i < 1000000; i++)
	{
		m_count++;
	}

}


int main()
{
    
	thread t1(add_self);
	thread t2(add_self);


	t1.join();
	t2.join();
    //m_count的结果有可能不是2000000
	cout << "m_count=" << m_count << endl;
}

解决方法1,使用互斥量

int m_count = 0;
mutex m1;

void add_self()
{
    for (size_t i = 0; i < 1000000; i++)
    {
        m1.lock();
        m_count++;
        m1.unlock();
    }
}

解决方法1,使用原子操作

概述

  • 原子操作是不使用互斥量加锁就可以实现 程序片段不会被打断的多线程并发技术
  • 比互斥量效率更高一点
  • 原子操作一般都是针对一个变量,而互斥量是作用在代码片段中
// 也可以使用 std::atomic_int来代替 std::atomic<int>
std::atomic<int> m_count = 0;

void add_self()
{
	for (size_t i = 0; i < 1000000; i++)
	{ 
		//这是一个原子操作,不会被线程切换打断
		m_count++;

	}
}

常见原子操作

  • ++,–,+=,-=,&=,等运算是原子操作
  • v=v+1这类不是原子操作

注意事项

不能给原子变量进行拷贝构造,例如以下代码是错误的:

std::atomic<int> m_count = 0;
std::atomic<int> b = m_count;
  • 原子方式读取 atomic.load()

可以使用原子变量的load方法以原子操作的方式读取变量值

std::atomic<int> m_count = 0;
std::atomic<int> b(m_count.load());
  • 原子方式写入 atomic.store()
std::atomic<int> m_count = 0;
m_count.store(50);

recursive_mutex 递归的独占互斥量

  • mutex在加锁前锁必须处于没有加锁的状态下,即不能在同一个线程中多次加锁

  • recursive_mutex允许多次加锁

  • 递归互斥量效率比互斥量低

recursive_mutex lock1;
int main()
{
    lock1.lock();
    lock1.lock();
    std::cout << "Hello World!\n";
    lock1.unlock();
    lock1.unlock();   
}

带超时功能的互斥量

获取超时互斥量的锁时,如果一段时间内没有获取到锁,程序会取消阻塞

mutex和recursive_mutex分别有对应的超时互斥量timed_mutexrecursive_timed_mutex

std::timed_mutex timeLock;
std::recursive_timed_mutex reTimeLock;
  • 重要方法
    • try_lock_for
    • try_lock
    • try_lock_until
timeLock.try_lock_for(4000ms);
    
lock.try_lock_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(1000));

示例

int main()
{
 
    std::timed_mutex timeLock;

    //这里用lambda表达式构造了一个线程
    thread t1([&timeLock]() {
        if (timeLock.try_lock_for(4000ms))
        {
			cout << "thread " << this_thread::get_id() << " 获取到锁" << endl;
			this_thread::sleep_for(2s);
            timeLock.unlock();
            cout << "thread " << this_thread::get_id() << "释放锁" << endl;
        }
        else
        {
            cout << "thread " << this_thread::get_id() << " 没有获取到锁" << endl;
        }

        });

    timeLock.lock();
    cout << "main thread " << this_thread::get_id() << " start work" << endl;
    this_thread::sleep_for(3s);
    cout<< "main thread " << this_thread::get_id() << " finish work" << endl;
    timeLock.unlock();
    t1.join();
}