【linux】线程的互斥与同步

时间:2022-01-18 01:16:28

一、线程安全

先看一段代码
这里用了上一章对原生线程库的封装多线程控制详述

// mythread.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <cstring>
#include <string>
#include <cassert>
#include <functional>
#include <unistd.h>

class Thread;

class Context
{
public:
    Thread *_this;
    void *_args;

    Context()
        : _this(nullptr)
        , _args(nullptr)
    {}
};

class Thread
{
public:
    typedef std::function<void*(void*)> func_t;

    Thread(func_t fun, void* args, int number)
        : _func(fun)
        , _args(args)
    {
        char buf[64];
        snprintf(buf, sizeof buf, "thread-%d", number);
        _name = buf;
    }

    // 不加static就会有this指针
    static void* start_routine(void* args)
    {
        //return _func(args);
        // 无this指针,无法调用
        Context* pct = static_cast<Context*>(args);
        pct->_this->_func(pct->_args);
        delete pct;
        return nullptr;
    }

    void start()
    {
        // int n = pthread_create(&_tid, nullptr, _func, _args);
        // _func是C++函数,pthread_create是C接口,不能混编
        Context* pct = new Context();
        pct->_this = this;
        pct->_args = _args;
        int n = pthread_create(&_tid, nullptr, start_routine, pct);
        assert(n == 0);
        (void)n;
    }

    void join()
    {
        int n = pthread_join(_tid, nullptr);
        assert(n == 0);
        (void)n;
    }
private:
    std::string _name;// 线程名
    pthread_t _tid;// 线程id
    func_t _func;// 调用方法
    void *_args;// 参数
};

我们写了一个模拟抢票的程序,假设有1000张票,有三个人去抢:

// mythread.cc
#include "mythread.hpp"

using std::cout;
using std::endl;

int tickets = 1000;

void* get_ticket(void* args)
{
    std::string name = static_cast<const char *>(args);
    while(true)
    {
        if(tickets > 0)
        {
            usleep(11111);
            cout << name << "正在抢票: " << tickets << endl;
            tickets--;
        }
        else
        {
            break;
        }
    }
    return nullptr;
}

int main()
{
    Thread* thread1 = new Thread(get_ticket, (void*)"usr1", 1);
    Thread* thread2 = new Thread(get_ticket, (void*)"usr2", 2);
    Thread* thread3 = new Thread(get_ticket, (void*)"usr3", 3);
    thread1->start();
    thread2->start();
    thread3->start();
    thread1->join();
    thread2->join();
    thread3->join();
    return 0;
}

【linux】线程的互斥与同步

可以看到这里居然抢到了负数,这肯定是有问题的。

这是什么原因导致的呢?

首先要知道什么时候会发生线程切换:当时间片到了或者来了更高优先级的线程或者线程等待的时候。
什么时候检测上面的问题呢:从内核态返回用户态的时候,线程要对调度状态进行检测,如果可以就直接发生线程切换。
出现抢到负数的票就是因为多个线程交叉执行,多个线程交叉执行的本质:让调度器尽可能频繁的发生线程的调度与切换
再来看我们写的代码,当tickets == 1的时候进去直接usleep,线程就会被切走,而它还没来得及tickets--,所以其它线程看到了tickets也是1,也会进入执行tickets--

对一个全局变量进行多线程更改是安全的吗?

我们知道原子性(要么不做,要么做完)的操作是安全的,而++--不是原子性的吗?我们看到的是一句,但是汇编之后却不是一句:1.从内存读取数据到CPU的寄存器中 2.在CPU中进行算数或逻辑运算 3.写回新的结果到内存中。用图来表示:
【linux】线程的互斥与同步
【linux】线程的互斥与同步
【linux】线程的互斥与同步

由此可知我们定义的全局变量在没有保护的时候,往往是不安全的,多个线程交替执行时造成数据安全问题,发生了数据不一致问题。

而解决这种问题的办法就是加锁!

二、线程互斥

2.1 基础概念

  • 临界资源: 多线程执行流共享的资源就叫做临界资源
  • 临界区: 每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥: 任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  • 原子性: 不会被任何调度机制打断的操作,该操作只有两态,要么不做,要么做完

怎么判断是不是原子性呢?

一个对资源进行的操作如果只用一条汇编语句就能完成,那么就是原子性的,反之就不是。

2.2 互斥量(锁)mutex

要解决上面的线程不安全的情况,我们要做到以下几点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量一个线程持有锁,其他的线程就无法进来访问

2.2.1 初始化和销毁锁

#include <pthread.h>
// 释放
int pthread_mutex_destroy(pthread_mutex_t *mutex);
// 初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
       const pthread_mutexattr_t *restrict attr);
// 静态分配(全局)
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_t就是锁的类型。我们要先定义出一个锁对象,然后初始化,使用完成后销毁。

当我们定义出来的锁如果是全局的,就不需要调用pthread_mutex_initpthread_mutex_destroy来初始化和销毁了。

销毁的时候要注意:

使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
不要销毁一个已经加锁的互斥量
已经销毁的互斥量,要确保后面不会有线程再尝试加锁

2.2.2 加锁与解锁

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 成功返回0,失败返回错误号

2.3 锁的使用

我们现在就可以为上面的抢票系统加个锁
为了把锁传递给每个线程,我们可以定义一个结构体:

int tickets = 1000;

class ThreadData
{
public:
    ThreadData(const std::string& name)
        : _name(name)
        , _plock(nullptr)
    {}
    ~ThreadData()
    {}
public:
    std::string _name;
    // 锁不允许被拷贝,所以传递指针
    pthread_mutex_t* _plock;
};

void* get_ticket(void* args)
{
    ThreadData *ptd = static_cast<ThreadData*>(args);
    while(true)
    {
        // 加锁
        pthread_mutex_lock(ptd->_plock);
        if(tickets > 0)
        {
            usleep(11111);
            cout << ptd->_name << "正在抢票: " << tickets << endl;
            tickets--;
            // 解锁
            pthread_mutex_unlock(ptd->_plock);
        }
        else
        {
            // 解锁
            pthread_mutex_unlock(ptd->_plock);
            break;
        }
    }
    return nullptr;
}

int main()
{
#define NUM 3
    // 定义+初始化锁
    pthread_mutex_t lock;
    pthread_mutex_init(&lock, nullptr);
    std::vector<pthread_t> tids(NUM);
    for(int i = 0; i < NUM; i++)
    {
        char buf[64];
        snprintf(buf, sizeof buf, "thread %d", i + 1);
        ThreadData* td = new ThreadData(buf);
        td->_plock = &lock;
        pthread_create(&tids[i], nullptr, get_ticket, td);
    }
    for(auto& e : tids)
    {
        pthread_join(e, nullptr);
    }
    // 销毁锁
    pthread_mutex_destroy(&lock);
    return 0;
}

【linux】线程的互斥与同步
现象:
运行几次发现每次都能刚好减到1,而且速度比之前慢了不少。
原因是加锁了之后多个线程串行执行。
这里还可以看到每次都是一个线程在抢票,因为锁只规定互斥访问,并没有规定谁来持有锁。所以谁的竞争能力强谁来持有锁。

【linux】线程的互斥与同步
这里是因为一个线程刚解锁完立马就又申请锁,而我们知道正常情况我们抢完票以后还会做一些事情,比如说发送订单,在这个过程的时候锁已经被释放,其他的线程就可以申请锁了。
【linux】线程的互斥与同步
【linux】线程的互斥与同步

这样就是正常的抢票过程

2.4 锁的理解

进过我们前面的学习,每个线程都需要访问锁,所以锁是一个共享资源。锁是用来保护共享资源,那么锁的安全谁来保护呢?
所以pthread_mutex_lock的过程必须是安全的(加锁的过程是原子性的)。
如果锁申请成功了没什么好说的,继续向后执行,如果暂时没有申请成功而其他线程也来申请锁,那么执行流就会阻塞,直到锁被释放。
【linux】线程的互斥与同步
【linux】线程的互斥与同步

上面我们讲加锁的时候还有一个库函数:

int pthread_mutex_trylock(pthread_mutex_t *mutex);

这里就是尝试加锁,如果成功了就获得锁,没有成功就通过返回值返回错误。

如果有五个线程,我们对四个线程都加锁,一个线程不加锁,那么不是也不是线程安全的吗?

加锁是个程序员行为,如果有这种情况,只能说写出来BUG。

2.4.1 锁的原子性

当一个线程持有锁时,也可能被切换走,但它是带着锁离开的,其他的锁进来无法申请锁成功,就无法向后执行。除非持有锁的进程释放锁。

所以对于其他线程,有意义的锁只有两种情况:
1️⃣ 申请锁前
2️⃣ 释放锁后

所以站在其他线程的角度上的,看待持有锁的线程就是原子的

先来看一下加锁的汇编:
【linux】线程的互斥与同步
这里的xchgb指令可以把cpu中的数据和内存中的数据直接交换
【linux】线程的互斥与同步
只要线程1不把1写回去后进行交换,所有的其他进程都访问不到临界资源。
我们可以把mutex看成一张入场券,如果一个线程拿到了其他线程就进不去。

而解锁的过程就是把寄存器中的1拷贝到内存中,直接返回即可。

2.5 锁的封装

我们想要实现一个传入一个锁自动帮我们加锁和解锁

// mymutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>

class Mutex 
{
public:
    Mutex(pthread_mutex_t* plock = nullptr)
        : _plock(plock)
    {}

    void lock()
    {
        // 被设置过
        if(_plock)
        {
            pthread_mutex_lock(_plock);
        }
    }

    void unlock()
    {
        if(_plock)
        {
            pthread_mutex_unlock(_plock);
        }
    }
private:
    pthread_mutex_t *_plock;
};

// 自动加锁解锁
class LockAuto
{
public:
    LockAuto(pthread_mutex_t *plock)
        : _mutex(plock)
    {
        _mutex.lock();
    }

    ~LockAuto()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

【linux】线程的互斥与同步
【linux】线程的互斥与同步

2.6 可重入函数与线程安全

线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。

重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

两个概念本质没有什么联系。但是有一些线程安全的问题就是因为不可重入导致的。
线程安全不一定是可重入的,而可重入函数则一定是线程安全的

2.7 死锁

2.7.1 死锁的基本概念

举个例子,两个线程都各自有一个锁,而现在它们都想要对方的锁,但我们直到锁必须要自己释放才能被其他线程获取,所以这样就导致了两个线程互相等待对方的资源,代码无法推进。这样的情况就叫做死锁

一把锁能不能造成死锁?

可以!我们前面写过,加了一把锁然后再次加锁就导致了死锁。

2.7.2 死锁的必要条件

1️⃣ 互斥条件:一个资源每次只能被一个执行流使用
2️⃣ 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
3️⃣ 不剥夺条件: 一个执行流已获得的资源,在末使用完之前,不能强行剥夺
4️⃣ 循环等待条件: 若干执行流之间形成一种头尾相接的循环等待资源的关系

2.7.3 破坏死锁

只要破坏了上面死锁的四个必要条件其中一条,就可以避免死锁了。
第一个条件无法破坏。
第二个条件我们可以避免不释放的场景。
第三个条件我们可以让线程主动释放锁。
第四个条件我们可以让加锁顺序一致,或者可以一次性把锁全部分配好。

三、线程同步

在上面的抢票系统我们可以看到一个线程连续抢票,造成了其他线程的饥饿,而为了解决这个问题,我们在数据安全的情况下让这些线程按照一定的顺序进行访问,这叫做线程同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。线程同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。为了完成线程同步就需要条件变量。
当我们申请临界资源前,先要做临界资源是否存在的检测,检测的本质也是访问临界资源。那么对临界资源的检测也一定要在加锁和解锁之间的。常规方法检测临界资源是否就绪,就注定了我们必须频繁地申请锁和释放锁。

3.1 条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量

为了解决频繁申请和释放锁的问题,需要做到以下两点:

  • 不要让线程在频繁地检测资源是否就绪,而让线程在资源未就绪时进行阻塞等待
  • 当资源就绪的时候,通知等待该资源的线程,让这些线程来进行资源的申请和访问。

3.2 条件变量接口

条件变量初始化和销毁

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
       const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

跟之前的互斥锁一样,要使用条件变量得先定义一个条件变量。他的接口都可以类比互斥锁。

线程在资源未就绪时进行阻塞等待
进行等待

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex,
       const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex);

这里pthread_cond_wait第二个参数会在下一章节详细讲述基于阻塞队列的生产者消费者模型(条件变量)

当资源就绪的时候,通知等待该资源的线程
唤醒等待

#include <pthread.h>
// 唤醒一批线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 唤醒一个线程
int pthread_cond_signal(pthread_cond_t *cond);

3.3 理解条件变量

举个例子,现在我们去医院看病,如果没有什么规则,大家全部都挤在门口,等人出来了大家一窝蜂的往门口挤,这样竞争力弱的有可能一早上都没看到医生。
所以医院设置了一个等待区,所有人在这里按号进行排队,等里面的人出来了排在第一的就进去。
我们把这个等待区就叫做条件变量,所以唤醒一个线程是通过条件变量实现的。当条件不满足时,线程必须去某些定义好的条件变量进行等待。

我们定义好的条件变量(结构体)有一个队列,不满足条件的线程链接在这个队列上进行等待。

【linux】线程的互斥与同步

3.4 条件变量的使用

我们想要使用条件变量来控制线程的执行。
首先要知道条件变量必须配合互斥锁使用,因为条件变量本身不具备互斥功能。

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include <vector>
#include "mymutex.hpp"

using std::cout;
using std::endl;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int ticket = 1000;

void* get_ticket(void* args)
{
    std::string name = static_cast<const char*>(args);
    while(true)
    {
        LockAuto lock(&mutex);
        // 先等待
        pthread_cond_wait(&cond, &mutex);
        cout << name << "正在抢票: " << ticket << endl;
        ticket--;
    }
}

int main()
{
    std::vector<pthread_t> tids(5);
    for(int i = 0; i < 5; i++)
    {
        char* buf = new char[64];
        snprintf(buf, 64, "thread %d ", i + 1);
        pthread_create(&tids[i], nullptr, get_ticket, buf);
    }
    // 每个1s唤醒一个在条件变量下等待的线程
    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond);
        //pthread_cond_broadcast(&cond);
    }
    for(const auto& tid : tids)
    {
        pthread_join(tid, nullptr);
    }
    return 0;
}

这里我们创建了五个新线程,使用主线程用条件变量来控制这五个线程,每隔1s就唤醒一个线程
【linux】线程的互斥与同步
这里我们可以看到每隔1s打印一行,并且都是按照一定的顺序唤醒线程
当然我们也可以使用pthread_cond_broadcast一次唤醒一批线程。
【linux】线程的互斥与同步
【linux】线程的互斥与同步
这里的现象就是每隔1s打印五行,并且也是按照一定的顺序