c++并发与多线程
子线程结束,主线程不能结束,否则会出错,和java不一样。
可以用join
的方式让主线程等待子线程执行结束。
quickStart
线程相关头文件 #include <thread>
使用全局函数构造一个线程对象
#include <iostream>
#include <thread>
#include <fstream>
void myPrint() {
ofstream ofs;
ofs.open("data.txt", ios_base::out);
for (size_t i = 0; !flag; i++)
{
ofs << "new thread " <<i <<endl;
}
ofs.close();
}
int main()
{
thread thread1(myPrint);
thread1.join();//主线程等待thread1执行结束
for (size_t i = 0; i < 100; i++)
{
cout << "main thread " << i << endl;
}
}
这里用join让主线程等待子线程结束,再执行后面的代码。
创建thread对象后,线程即可启动。
线程的执行方式join和detach
join
thread thread1(myPrint);
thread1.join();//主线程等待thread1执行结束
调用子线程join
方法后,父线程会阻塞,等待子线程执行结束后再执行。
detach
thread1.detach();
调用子线程detach
方法后,子线程会和父线程分离,子线程父线程同时进行,主线程可以不用等待其他线程结束就可以正常结束。
ℹ️需要注意的问题:
- 当子线程中有主线程对象的引用时,如果主线程先于子线程结束,这些引用的对象就会被释放,导致子线程中的引用出错
- 主线程中用来构造子线程的对象(对象A),会被复制到子线程中,因此,主线程结束释放对象A后不会影响子线程的执行
线程的构造方式
全局函数
#include <iostream>
#include <thread>
#include <fstream>
void myPrint() {
ofstream ofs;
ofs.open("data.txt", ios_base::out);
for (size_t i = 0; !flag; i++)
{
ofs << "new thread " <<i <<endl;
}
ofs.close();
}
int main()
{
thread thread1(myPrint);
thread1.join();//主线程等待thread1执行结束
for (size_t i = 0; i < 100; i++)
{
cout << "main thread " << i << endl;
}
}
对象
class ThreadJob
{
int count;
public:
ThreadJob(int n):count(n) {
cout << "MyThread()" << endl;
}
ThreadJob(const ThreadJob& mt):count(mt.count) {
cout << "MyThread(const MyThread&)" << endl;
}
//重载括号运算符
void operator()()
{
cout << "MyThread work" << endl;
}
};
int main()
{
//构造方式2:使用可执行对象构造线程
//ThreadJob 对象会被复制到线程中去,因此主线程结束后myThread对象被释放不会影响线程thread2的执行
ThreadJob job(5);
thread thread2(job);
if (thread2.joinable())
{
thread2.join();
}
for (size_t i = 0; i < 100; i++)
{
cout << "main thread " << i << endl;
}
return 0;
}
lambda表达式
int main()
{
/*for (size_t i = 0; i < 100; i++)
{
cout << "main thread " << i << endl;
}*/
//构造方式3:lambda
auto myLamdaThreadJob=[]{
cout << "labmda" << endl;
};
thread thread3(myLamdaThreadJob);
thread3.join();
flag = true;
return 0;
}
c++中lambda表达式的内容可以参考
https://docs.microsoft.com/zh-cn/cpp/cpp/lambda-expressions-in-cpp?view=msvc-160
std::this_thread
指代当前线程对象
线程ID
使用下面方法可以获取线程id
std::this_thread::get_id()
线程休眠
std::chrono::milliseconds dura(200);
std::this_thread::sleep_for(dura);
线程传递参数
传递引用
示例
使用函数指针构造线程,函数传递两个参数
class ThreadJob
{
int count;
public:
ThreadJob(int n):count(n) {
cout << "ThreadJob() threadId="<<this_thread::get_id() << endl;
}
ThreadJob(const ThreadJob& mt):count(mt.count) {
cout << "ThreadJob(const ThreadJob&) threadId=" << this_thread::get_id() << endl;
}
~ThreadJob()
{
cout << "~ThreadJob() threadId=" << this_thread::get_id() << this<< endl;
}
};
void myPrint2( int i, const ThreadJob& job) {
cout << &job << endl;
}
int main()
{
cout << "main threadId=" << this_thread::get_id() << endl;
ThreadJob job(10);
thread thread1(myPrint2,10,ref(job));
thread1.join();
return 0;
}
注意:
void myPrint2( int i, const ThreadJob& job) {
cout << &job << endl;
}
这个函数中的引用参数必须得用const
修饰。
- 构造线程时,如果参数会发生隐式转换,则隐式转化发生在子线程中
thread thread1(myPrint2,10,20); //这里把20使用ThreadJob(int)隐式转换成ThreadJob对象
- 这会导致在主线程释放了局部变量后,子线程中的隐式转换还没有完成,从而出错。
- 可以使用匿名对象的方式显示构造对象
thread thread1(myPrint2,10,ThreadJob(20));//匿名对象的构建发生在主线程中
- 匿名对象构建完成之后,不管线程回调函数参数中是否是引用,都会将主线程中传入的参数对象复制一份给子线程。如果回调函数参数不是引用,则对象复制会发生两次
- 可以使用
std::ref(obj)
或者有些情况下也可以用&obj
方式传递主线程对象的真正引用,
thread thread1(myPrint2,10,std::ref(job));
传递智能指针
示例:传递独占式指针
void job1(unique_ptr<int> i) {
}
int main()
{
unique_ptr<int> i1(new int(100));
thread thread4(job1,std::move(i1));
thread4.join();
}
传递成员函数
示例
class ThreadJob
{
int count;
public:
ThreadJob(int n):count(n) {
//....
}
void work(int n) {
//.....
}
};
int main()
{
ThreadJob job(10);
thread thread2(&ThreadJob::work,job,10); //job对象会复制一份到子线程中
thread2.join();
}
加锁
互斥量(mutex)、lock、unlock
mutex.lock和mutex.unlock必须成对使用
mutex
需要包含头文件
#include <mutex>
示例代码:
#include <iostream>
#include <mutex>
#include<list>
using namespace std;
class MsgProcessor
{
public:
void MsgProcessor::procMsg()
{
for (size_t i = 0; i < 10000; i++)
{
m_mutex.lock();
if (!msgQueue.empty())
{
int msg = msgQueue.front();
cout << "process msg:" << i << endl;
msgQueue.pop_front();
}
else
{
cout << "start msgProc() but list is empty" << endl;
}
m_mutex.unlock();
}
}
void MsgProcessor::receiveMsg()
{
for (size_t i = 0; i < 1000; i++)
{
cout << "receive msg:" << i << endl;
m_mutex.lock(); //加锁
msgQueue.push_back(i);
m_mutex.unlock();//解锁
}
}
private:
std::list<int> msgQueue;
//互斥量
std::mutex m_mutex;
};
int main()
{
MsgProcessor msgProcessor;
thread threadRecMsg(&MsgProcessor::receiveMsg, &msgProcessor);
thread threadProcMsg1(&MsgProcessor::procMsg, &msgProcessor);
thread threadProcMsg2(&MsgProcessor::procMsg, &msgProcessor);
threadRecMsg.join();
threadProcMsg1.join();
threadProcMsg2.join();
return 0;
}
std::lock_guard类模板
使用lock_guard
模板类对象可以自动加锁和释放锁,
加锁范围为从对象声明构造开始到作用范围结束后对象析构
void MsgProcessor::procMsg()
{
for (size_t i = 0; i < 10000; i++)
{
//lock_guard 对象在构造时会调用mutex对象的lock方法,析构时会调用mutex的unlock方法
std::lock_guard<std::mutex> locker(m_mutex);
//.....
}
}
std::lock函数模板
- std::lock函数模板可以一次给两个或者两个以上的互斥量
- 不存在因为锁的顺序导致死锁的问题
- 原因:等待所有互斥量全都锁住才能完成上锁
- 加锁之后需要手动释放锁
示例:
std::lock(m_mutex, m_mutex2);
if (!msgQueue.empty())
{
int msg = msgQueue.front();
cout << "process msg:" << i << endl;
msgQueue.pop_front();
}
m_mutex.unlock();
m_mutex2.unlock();
使用std::lock之后需要手动释放锁,可以使用lock_guard的特性实现自动释放锁。
std::lock(m_mutex, m_mutex2);
std::lock_guard<mutex> guard1(m_mutex, std::adopt_lock);
//使用lock_guard(mutex)会默认调用mutex的lock方法加锁,而std::lock方法中已经给互斥量加过锁,
//因此,这里必须在构造时传入std::adopt_lock这个参数
std::lock_guard<mutex> guard2(m_mutex2, std::adopt_lock);
if (!msgQueue.empty())
{
int msg = msgQueue.front();
cout << "process msg:" << i << endl;
msgQueue.pop_front();
}
unique_lock类模板
-
lock_guard
简化了mutex
的lock
和unlock
管理 -
unique_lock
比lock_guard
更灵活,但是效率较低 -
unique_lock
可以完全取代lock_guard
std::unique_lock<std::mutex> lock1(m_mutex);
if (!msgQueue.empty())
{
int msg = msgQueue.front();
cout << "process msg:" << i << endl;
msgQueue.pop_front();
}
构造函数
- std::adopte_lock,表示互斥量已经被加锁
//std::adopte_lock
m_mutex.lock();
std::unique_lock<std::mutex> lock1(m_mutex,std::adopt_lock);
- std::try_to_lock
- 尝试去锁定mutex,如果没有锁定成功,则会立即返回,并不会产生阻塞
- 使用前互斥量不能加锁
std::unique_lock<mutex> lock1(m_mutex,std::try_to_lock);
if (lock1.owns_lock())
{
cout << "receive msg:" << endl;
}
else
{
cout << "接受消息线程没有拿到消息队列锁,跳过" << endl;
}
- std::defer_lock
- 使用时互斥量不能加锁,否则会报异常
- 这个参数表示初始化一个没有加锁的互斥量
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
重要成员函数
lock
- 使用
lock()
加锁之后可以自动释放锁
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
unlock()
- 使用
unlock()
释放锁
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
//...
lock1.unlock();
//...
lock1.lock();
//...
lock1.unlock();
try_lock()
- 返回值
- true: 拿到锁
- false: 没有拿到锁
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
if (lock1.try_lock())
{
//拿到锁
}
else
{
//没有拿到锁
}
release()
返回unique_lock
对象所管理mutex
对象的指针,并释放所有权
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
std::mutex *mutex_ptr = lock1.release();
mutex_ptr->unlock();
unique_lock所有权的传递
- 通常
unique_lock
需要管理一个mutex
对象 - 所有权可以传递,但是不能复制
std::unique_lock<std::mutex> lock1(m_mutex, std::defer_lock);
lock1.lock();
//所有权的传递
std::unique_lock<std::mutex> lock2(std::move(lock1));
std::call_once()
-
std::call_once
是c++11引入的函数,能够保证传入的函数只被调用一次,比使用mutex
消耗的资源更少 -
std::call_once
需要和std::once_flag
结合使用。
单例模式示例
实现一个日志管理器LogManager
LogManager.h
#pragma once
#include <mutex>
using namespace std;
class LogManager
{
private:
static LogManager* manager;
static mutex *lock1;
LogManager();
//用于释放单例对象
class GC
{
public:
~GC();
};
public:
static LogManager* getInstance();
void test();
};
LogManager.cpp
#include "LogManager.h"
#include <iostream>
LogManager::LogManager()
{
}
LogManager* LogManager::getInstance()
{
//这里使用两段锁的方式来进行单例对象的初始化
if (manager==NULL)
{
LogManager::lock1->lock();
if (manager==NULL)
{
manager = new LogManager();
static GC gc;
}
LogManager::lock1->unlock();
}
return manager;
}
void LogManager::test()
{
std::cout << "test" << std::endl;
}
LogManager::GC::~GC() {
if (manager)
{
delete manager;
manager = NULL;
}
}
//类的静态成员变量要在类声明的外部初始化
LogManager* LogManager::manager = NULL;
mutex* LogManager::lock1 = new mutex();
main.cpp
#include <iostream>
#include "LogManager.h"
int main()
{
std::cout << "Hello World!\n";
LogManager* manager1 = LogManager::getInstance();
LogManager* manager2 = LogManager::getInstance();
manager1->test();
}
call_once在单例模式中的应用
LogManager.h
#pragma once
#include <mutex>
using namespace std;
class LogManager
{
private:
static LogManager* manager;
//static mutex *lock1;
//std::call_once需要的标记
static std::once_flag* flag;
LogManager();
static void createInstance();
class GC
{
public:
~GC();
};
public:
static LogManager* getInstance();
void test();
};
LogManager.cpp
#include "LogManager.h"
#include <iostream>
LogManager::LogManager()
{
}
void LogManager::createInstance()
{
manager = new LogManager();
static GC gc;
}
LogManager* LogManager::getInstance()
{
std::call_once(*flag, createInstance);
return manager;
}
void LogManager::test()
{
std::cout << "test" << std::endl;
}
LogManager::GC::~GC() {
if (manager)
{
delete manager;
manager = NULL;
}
}
LogManager* LogManager::manager = NULL;
mutex* LogManager::lock1 = new mutex();
std::once_flag* LogManager::flag = new std::once_flag();
std::condition_variable、wait、notify
- condition_variable.wait()方法可以让一个线程等待
- condition_variable.notify_one()和notify_all()可以唤醒等待的线程
示例
//定义条件变量
std::condition_variable cv;
//使用wait
std::unique_lock<std::mutex> lock(m_mutex);
cv.wait(lock, [this] {
if (msgQueue.empty())
{
cout << "消息队列为空,处理线程等待" << endl;
return false;
}
return true;
});
//使用notify
std::unique_lock<mutex> lock1(m_mutex,std::try_to_lock);
if (lock1.owns_lock())
{
cout << "receive msg:" << i << endl;
msgQueue.push_back(i);
//唤醒正在wait的线程
cv.notify_all();
}
- std::conditioan_variable.wait()第二个参数lambda表达式返回bool
- true,wait 直接返回,程序继续执行
- false, wait 将解锁互斥量,并且让线程等待被唤醒
- condition.wait()没有第二个参数,和第二个参数直接返回false效果一样,wait将解锁互斥量,并让线程等待
- wait中的线程可以通过 notify_one()唤醒,唤醒后会首先尝试继续获取互斥量的的锁
- 获取到锁之后,如果wait时有第二个参数,则会继续判断第二个参数的返回值,
- false重新释放锁并等待,true继续执行后续语句
notify_one和notify_all
- notify_one 可以从等待中的线程中唤醒一个
- notify_all可以将所有等待中的线程唤醒
std::async,std::future
-
std::async
是个函数模板,用来启动一个异步任务,启动起来的异步任务会返回一个std::future
对象,- 异步任务可以通过
std::future
对象返回一个结果,使用future.get()获取结果 - 也可以使用future.wait等待线程结束,不取得结果
- 如果不显式调用 future.get或者 future.wait,程序结束时也会等待线程结束
- 异步任务可以通过
-
需要的头文件
future
示例
int sum(int* data, int n) {
cout << "thread id=" << this_thread::get_id() << " start work" << endl;
int result = 0;
for (size_t i = 0; i < n; i++)
{
result += data[i];
}
cout << "thread id=" << this_thread::get_id() << " finish work" << endl;
return result;
}
const int length = 10;
int main()
{
srand((unsigned)time(0));
int* data;
data = new int[length];
for (size_t i = 0; i < length; i++)
{
data[i] = rand() % 100;
}
future<int> Future1 = async(sum, data, length);//线程立即开始执行
future<int> Future2 = async(sum, data, length);
//get只能调用一次
int sum1 = Future1.get();
//wait等待线程执行结束,不拿到返回结果
Future2.wait();
}
线程控制参数
future<int> Future1 = async(std::launch::deferred, sum, data, length);
future<int> Future2 = async(std::launch::async, sum, data, length);
- 使用std::async()是可以传入 std::launch枚举参数
- std::launch::deferred
- 线程入口函数调用被延迟到 future的wait或者 get方法调用时才执行
- 两个方法都没有调用时,线程不执行
- 这种情况下,代码实际上是在调用线程中执行的
- std::launch::async
- 强制创建新的线程(async并不是所有情况下都创建新的线程)
- launch::deferred和launch::async可以同时使用
- 不传入参数时,默认为
async|deffered
,由系统决定是否创建新的线程
- std::launch::deferred
std::async(std::launch::async|std::launch::deferred, sum, data, length);
std::future
-
std::future的get函数会进行结果的转移,所以只能使用一次,如果需要多次获取future的结果,可以使用shared_future
-
future_status
future.wait_for()可以获取线程执行状态,返回值是 std::future_status
enum class future_status { // names for timed wait function returns
ready,
timeout,
deferred
};
示例:
future<int> future2 = async(std::launch::async, sum, data, length);
//future_status
//这里等待2000ms获取future的状态
std::future_status status = future2.wait_for(chrono::milliseconds(2000));
if (status==std::future_status::timeout)
{
cout << "timeout";
}
else if (status == std::future_status::ready)
{
future1.get();
cout << "ready";
}
else if (status == std::future_status::deferred)
{
cout << "deferred";
}
cout << endl;
- future.valid()
- 判断future是否有效,一个future被调用过get方法或者自身被构造为shared_future之后,会发生持有变量的转移,导致valid返回false(0)
future<int> future2 = async(std::launch::async, sum, data, length);
cout << "future2.valid()=" << future2.valid() << endl;
std::shared_future<int> shared_future1(future2.share());
cout <<"future2.valid()=" << future2.valid() << endl;
std::shared_future
- 共享future,get方法会把线程返回的结果复制并返回,因此可以多次调用get方法
future<int> future2 = async(std::launch::async, sum, data, length);
//构造shared_future
//方式1
std::shared_future<int> shared_future1(std::move(future2));
//方式2
std::shared_future<int> shared_future1(future2.share());
//多次调用不会报错
shared_future1.get();
shared_future1.get();
shared_future1.get();
- 可以直接使用future来构造shared_future
//1
std::packaged_task<int(int*, int)> pt(sum);
std::shared_future<int> sfuture1(pt.get_future());
//2
std::shared_future<int> sfuture2(std::async(sum,data,length));
async和thread的区别
- async创建异步任务时,有的时候并不创建新的线程
- thread一定会创建新的线程,创建失败程序会出错
- async在没有参数指定必须创建新的线程,无法创建新的线程时,就不会创建新的线程,而是在调用它的线程上运行
std::packaged_task
- std::packaged_task是一个类模板,模板参数是各种可调用对象
- 通过std::packaged_task可以把各种可调用对象包装起来,方便将来作为线程入口参数
- 也可以直接调用,但是这种情况下没有新的线程
- 头文件
<future>
//创建 packaged_task对象
std::packaged_task<int(int*,int)> m_task(sum);
//===使用packageed_task创建线程==
std::thread t1(std::ref(m_task), data,length);
t1.join();
std::future<int> res = m_task.get_future(); //前面的t1已经开始执行,主线程不会等待子线程结束后再结束,所以必须先join再获取结果
//==直接调用packaged_task对象==
m_task(data, length);
future<int> res2 = m_task.get_future();
res2.get();
std::promise
- 类模板,能够在某个线程中给它赋值,在其他线程中取值
#include <iostream>
#include <thread>
#include <future>
using namespace std;
void myThread(std::promise<int>& tmp, int param1)
{
chrono::milliseconds dura(1000);
this_thread::sleep_for(dura);
param1++;
tmp.set_value(param1); //给promise对象赋值
return;
}
int main()
{
std::promise<int> Promise;
thread t1(myThread, std::ref(Promise), 10);
t1.join();
future<int> Future = Promise.get_future(); //从promise对象获取future,只能操作一次
auto result = Future.get();
cout << result << endl;
}
原子操作
原子操作示例
一些操作,例如加法,在计算机内部执行时会被分解成很多步骤,如果在执行时发生线程切换,会导致一次加法没有做完就切换到其他线程
#include <iostream>
#include <thread>
using namespace std;
int m_count = 0;
void add_self()
{
for (size_t i = 0; i < 1000000; i++)
{
m_count++;
}
}
int main()
{
thread t1(add_self);
thread t2(add_self);
t1.join();
t2.join();
//m_count的结果有可能不是2000000
cout << "m_count=" << m_count << endl;
}
解决方法1,使用互斥量
int m_count = 0;
mutex m1;
void add_self()
{
for (size_t i = 0; i < 1000000; i++)
{
m1.lock();
m_count++;
m1.unlock();
}
}
解决方法1,使用原子操作
概述
- 原子操作是不使用互斥量加锁就可以实现 程序片段不会被打断的多线程并发技术
- 比互斥量效率更高一点
- 原子操作一般都是针对一个变量,而互斥量是作用在代码片段中
// 也可以使用 std::atomic_int来代替 std::atomic<int>
std::atomic<int> m_count = 0;
void add_self()
{
for (size_t i = 0; i < 1000000; i++)
{
//这是一个原子操作,不会被线程切换打断
m_count++;
}
}
常见原子操作
- ++,–,+=,-=,&=,等运算是原子操作
- v=v+1这类不是原子操作
注意事项
不能给原子变量进行拷贝构造,例如以下代码是错误的:
std::atomic<int> m_count = 0;
std::atomic<int> b = m_count;
- 原子方式读取 atomic.load()
可以使用原子变量的load方法以原子操作的方式读取变量值
std::atomic<int> m_count = 0;
std::atomic<int> b(m_count.load());
- 原子方式写入 atomic.store()
std::atomic<int> m_count = 0;
m_count.store(50);
recursive_mutex 递归的独占互斥量
-
mutex在加锁前锁必须处于没有加锁的状态下,即不能在同一个线程中多次加锁
-
recursive_mutex允许多次加锁
-
递归互斥量效率比互斥量低
recursive_mutex lock1;
int main()
{
lock1.lock();
lock1.lock();
std::cout << "Hello World!\n";
lock1.unlock();
lock1.unlock();
}
带超时功能的互斥量
获取超时互斥量的锁时,如果一段时间内没有获取到锁,程序会取消阻塞
mutex和recursive_mutex分别有对应的超时互斥量timed_mutex
和recursive_timed_mutex
std::timed_mutex timeLock;
std::recursive_timed_mutex reTimeLock;
- 重要方法
- try_lock_for
- try_lock
- try_lock_until
timeLock.try_lock_for(4000ms);
lock.try_lock_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(1000));
示例
int main()
{
std::timed_mutex timeLock;
//这里用lambda表达式构造了一个线程
thread t1([&timeLock]() {
if (timeLock.try_lock_for(4000ms))
{
cout << "thread " << this_thread::get_id() << " 获取到锁" << endl;
this_thread::sleep_for(2s);
timeLock.unlock();
cout << "thread " << this_thread::get_id() << "释放锁" << endl;
}
else
{
cout << "thread " << this_thread::get_id() << " 没有获取到锁" << endl;
}
});
timeLock.lock();
cout << "main thread " << this_thread::get_id() << " start work" << endl;
this_thread::sleep_for(3s);
cout<< "main thread " << this_thread::get_id() << " finish work" << endl;
timeLock.unlock();
t1.join();
}