【Linux】多线程互斥与同步


一、线程互斥

1. 线程互斥的引出

互斥 指的是一种机制,用于确保在同一时刻只有一个进程或线程能够访问共享资源或执行临界区代码。 互斥的目的是 防止多个并发执行的进程或线程访问共享资源时产生竞争条件,从而保证数据的一致性和正确性,下面我们来使用多线程来模拟实现一个抢票的场景,看看所产生的现象。

#include <iostream>
#include <cstring>
#include <cassert>
#include <pthread.h>
#include <unistd.h>
#include "lockGuard.hpp"
#include "Thread.hpp"
using namespace std;
int tickets = 1000; // 加锁保证共享资源的安全性

void* threadRoutine(void* args)
{
    string name = static_cast<const char*>(args);
    while(true)
    {
        if(tickets > 0)
        {
            usleep(2000); // 模拟抢票花费的时间
            cout << name << " get a ticket: " << tickets-- << endl;
        }
        else
        {
            break;
        }
        usleep(1000);
    }
    return nullptr;
}

int main()
{
    // 创建四个线程
    pthread_t tids[4];
    int n = sizeof(tids) / sizeof(tids[0]);
    for(int i = 0; i < n; i++)
    {
        char* data = new char[64];
        snprintf(data, 64, "thread-%d", i + 1);
        pthread_create(tids + i, nullptr, threadRoutine, data);
    }

    for(int i = 0; i < 4; i++)
    {
        pthread_join(tids[i], nullptr);
    }
    return 0;
}

在这里插入图片描述

这里我们可以看到,当全局变量tickets被几个执行流共享时,最后变成了-1,这是因为如果我们如果使用多线程对一个全局变量修改时,线程之间会相互影响,导致线程安全问题。

下面我们来看一下当多个线程对共享变量进行修改时,为什么会发生上述的线程安全问题?

假设有一个全局变量 g_val=100被两个线程,线程A 和 线程B共享,在多线程环境下分别对同一个全局变量g_val进行操作。

当对变量进行操作时会分为三个步骤:

  1. CPU把内存中的数据读到寄存器里
  2. 在寄存器中对数据进行计算
  3. 将修改后的数据从寄存器里写回内存

在这里插入图片描述

下面我们来看一下线程A和线程B对全局变量进行操作时的过程:

  1. 线程A执行g_val- -操作
    在这里插入图片描述
    当线程A执行完第二步时,正准备执行第三步时,时间片到了,线程A需要将自己的上下文和数据带走。
    此时的线程A认为自己已经将数据修改99了,当下一次执行时继续执行步骤三。

  2. 线程B在while中执行g_val- -操作
    在这里插入图片描述

线程B通过while循环了90次将g_val修改成了10,此时时间片到了。因此线程B也将自己的上下文保存了起来。

  1. 继续执行线程A
    在这里插入图片描述

由于上次执行线程A时第3步没有执行,所以线程A继续执行第3步。但是内存中的g_val为上次线程B修改后的值10,所以线程A又将内存中的值改成了99。

因此,一切的原因都是修改全局变量时线程调度切换、并发访问进而导致了数据不一致;想要解决这个问题,我们就需要进行加锁保护。


2. 互斥量

要解决以上问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫 互斥量

在这里插入图片描述

? 初始化互斥量

  1. 静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
  1. 动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
  • 参数:
    mutex:要初始化的互斥量
    attr:NULL

? 互斥量加锁和解锁

// 加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
// 解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • 返回值:成功返回0,失败返回错误号

? 销毁互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex)

注意:

  • 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁

调用 pthread_ lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁

? 下面我们来使用互斥锁来改进一下改进上面的售票系统:

在这里插入图片描述

int tickets = 1000; // 加锁保证共享资源的安全性
pthread_mutex_t mutex; // 定义一把锁

void* threadRoutine(void* args)
{
    string name = static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        if(tickets > 0)
        {
            usleep(2000); // 模拟抢票花费的时间
            cout << name << " get a ticket: " << tickets-- << endl;
            pthread_mutex_unlock(&mutex);
        }
        else
        {
            pthread_mutex_unlock(&mutex);
            break;
        }
        usleep(1000);
    }
    return nullptr;
}

int main()
{
    pthread_mutex_init(&mutex, nullptr); // 初始化锁
    // 创建四个线程
    pthread_t tids[4];
    int n = sizeof(tids) / sizeof(tids[0]);
    for(int i = 0; i < n; i++)
    {
        char* data = new char[64];
        snprintf(data, 64, "thread-%d", i + 1);
        pthread_create(tids + i, nullptr, threadRoutine, data);
    }
    for(int i = 0; i < 4; i++)
    {
        pthread_join(tids[i], nullptr);
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}

在这里插入图片描述

因为加锁会导致临界区代码串行访问(互斥),从而导致代码的执行效率减低,因此我们在加锁之后会发现代码的运行速度比不加锁之前慢了许多。因此,进行加锁访问时,保证加锁的粒度越小越好,不要将不访问临界区资源的代码加锁。


3. 互斥锁的实现原理

互斥锁的进一步认识:

  • 加了锁之后,线程在临界区中也会被切换,但这样也不会有问题。因为线程是带着锁进行线程切换的,其余线程是无法申请到锁的,无法进入临界区访问临界资源。
  • 错误的编码方式:线程不申请锁直接访问临界区资源,这样的话,就算别的线程持有锁,该线程也可以进入到临界区。
  • 在没有持有锁的线程看来,对该线程最有意义的情况只用两种:
    1. 线程 1 没有持有锁(什么都没做)
    2. 线程 1 释放锁(做完),此时我可以申请锁。那么在线程 1 持有锁的期间,所做的所有操作在其他线程看来都是原子的!
  • 加锁后,执行临界区的代码一定是串行执行的!
  • 要访问临界资源,每一个线程都必须先申请锁,那么每一个线程都必须先看到同一把锁并访问它,所以锁本身也是一种共享资源。那么锁肯定也要保护起来,为了保护锁的安全,申请和释放锁的操作都必须是原子的!

互斥锁的细节:

  1. 凡是访问同一个临界资源的线程,都要进行加锁保护,而且必须加同一把锁,这个是一个游戏规则,不能有例外。
  2. 每一个线程访问临界区之前,得加锁,加锁本质是给 临界区 加锁,加锁的粒度尽量要细一些
  3. 线程访问临界区的时候,需要先加锁->所有线程都必须要先看到同一把锁->锁本身就是公共资源->锁如何保证自己的安全?-> 加锁和解锁本身就是原子的!
  4. 临界区可以是一行代码,可以是一批代码,
    a. 线程可能被切换吗?当然可能, 不要特殊化加锁和解锁,还有临界区代码。
    b. 此时线程进行切换会有影响吗?不会,因为在我不在期间,任何人都没有办法进入临界区,因为他无法成功的申请到锁!因为锁被我拿走了!
  5. 这也正是体现互斥带来的串行化的表现,站在其他线程的角度,对其他线程有意义的状态就是:锁被我申请(持有锁),锁被我释放了(不持有锁), 原子性就体现在这里
  6. 解锁的过程也被设计成为原子的!

互斥锁的原理:

为了实现互斥锁操作,大多数体系结构都提供了 swapexchange 指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

下面我们来根据lockunlock的伪代码来分析一下加锁和解锁的过程:

在这里插入图片描述

线程A:

  1. movb $0,al 调用线程,向自己的上下文写入0
    在这里插入图片描述

  2. xchgb %al,mutex 将cpu的寄存器中的%al 与 内存中的mutex 进行交换,本质是将共享数据交换到 自己的私有的上下文中。交换只有 一条汇编指令 ,要么没交换,要不就交换完了,即加锁的原子性
    在这里插入图片描述

  3. 判断al寄存器中的内容是否大于0,如果大于0,证明加锁成功。
    在这里插入图片描述

线程B:

  1. 切换成线程B,继续执行前两条指令,先将 al寄存器数据置为0,再将寄存器中的数据 与 内存中的数据进行交换。

在这里插入图片描述

  1. 接着判断al寄存器中的内容是否大于0,发现并不大于0,说明b申请锁失败,紧接着b线程被挂起等待,同时b的上下文随着b的挂起被带走。

  2. 当A线程再次被切换回来时,继续执行上次还未执行的判断,发现al中的数据大于0,加锁成功
    在这里插入图片描述

  3. 线程A释放锁,movb $1,mutex 将内存中mutex的数据置为1,唤醒等待Mutex的线程,此时切换成线程B

  4. 线程B执行lock的前两条指令,此时就可以加锁成功了。在这里插入图片描述


二、可重入和线程安全

  • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数;否则,是不可重入函数。

? 常见的线程不安全的情况:

  • 不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数

? 常见的线程安全的情况:

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
  • 类或者接口对于线程来说都是原子操作
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性

? 常见的可重入的情况:

  • 不使用全局变量或静态变量
  • 不使用用malloc或者new开辟出的空间
  • 不调用不可重入函数
  • 不返回静态或全局数据,所有数据都有函数的调用者提供
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

? 常见的不可重入的情况:

  • 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
  • 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
  • 可重入函数体内使用了静态的数据结构

? 可重入与线程安全的联系:

  • 函数是可重入的,那就是线程安全的。线程安全的函数,不一定是可重入函数
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题(如:printf 函数是不可重入的,多线程向显示器上打印数据时,数据可能会黏在一起)
  • 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的

三、线程和互斥锁的封装

1. 线程封装

? Threa.hpp

#pragma once

#include <iostream>
#include <cstdlib>
#include <string>
#include <pthread.h>
using namespace std;

class Thread
{
public:
    typedef enum{
        NEW = 0,
        RUNNING,
        EXITED
    } ThreadStatus;
    typedef void (*func_t)(void*);

public:
    Thread(int num, func_t func, void* args) :_tid(0), _status(NEW),_func(func),_args(args)
    {
        char name[128];
        snprintf(name, 128, "thread-%d", num);
        _name = name;
    }

    int status(){ return _status; }
    string threadname(){ return _name; }

    pthread_t get_id()
    {
        if(_status == RUNNING)
            return _tid;
        else
            return 0;
    }

    static void* thread_run(void* args)
    {
        Thread* ti = static_cast<Thread*>(args);
        (*ti)();
        return nullptr;
    }

    void operator()()
    {
        if(_func != nullptr)
            _func(_args);
    }

    void run() // 封装线程运行
    {
        int n = pthread_create(&_tid, nullptr, thread_run, this);
        if(n != 0)
            exit(-1);
        _status = RUNNING; // 线程状态变为运行
    }

    void join() // 疯转线程等待
    {
        int n = pthread_join(_tid, nullptr);
        if(n != 0)
        {
            cout << "main thread join thread: " << _name << "error" << endl;
            return;
        }
        _status = EXITED;
    }

    ~Thread(){}
private:
    pthread_t _tid;
    string _name;
    func_t _func; // 线程未来要执行的回调
    void* _args;
    ThreadStatus _status;
};

在这里插入图片描述

1. 互斥锁封装

? lockGuard.hpp

class Mutex // 自己不维护锁,有外部传入
{
public:
    Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmutex);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *_pmutex;
};

class LockGuard // 自己不维护锁,有外部传入
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        _mutex.lock();
    }
    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

在这里插入图片描述


四、死锁

1. 死锁的概念

死锁 是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

下面我们通过一个小故事来让大家理解一下死锁:

有两个小朋友张三和李四,共同去了一家商店,想要购买一块1块钱的棒棒糖,但是他们两个各自都只有五毛钱。因此张三想要李四手里的五毛钱去买棒棒糖让自己吃,但这时候李四就不乐意了,他也想想要张三手里的五毛钱去买棒棒糖让自己吃。因此两个人陷入了僵局,因此买棒棒糖吃这件事情就一直无法推进下去。

  • 两个小朋友可以看作是两个线程,两个不同的小朋友可以看作两把不同的锁
  • 棒棒糖是临界资源,老板就是操作系统
  • 想要访问临界资源,必须同时拥有两把锁

在操作系统中我们可以通过两个线程的案例来理解死锁:

在这里插入图片描述

虽然一般来说产生死锁是因为两把及两把以上的锁导致的,但是一把锁也有可能会产生死锁。


2. 死锁的四个必要条件

  • 互斥条件:一个资源每次只能被一个执行流使用
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

3. 避免死锁

  1. 不加锁
  2. 主动释放锁
    (假设要有两把锁才能获取临界资源,本身有一把锁,在多次申请另一把锁时申请不到,就把自身的锁释放掉)
  3. 按照顺序申请锁
    (假设有线程A和B,线程A申请锁时,必须保持先A再B,线程B申请锁时,也必须保持先A再B
    当线程A申请到A锁时,线程B也申请到A,就不会出现互相申请的情况了)
  4. 控制线程统一释放锁
    (将所有线程 申请的锁 使用一个线程 全部释放掉,就不会出现死锁了)

证明:一个线程申请的锁,可以由另一个线程来释放

#include <iostream>
#include <unistd.h>
#include <pthread.h>

using namespace std;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

//一个线程加锁, 另一个线程释放锁

void* threadRoutine(void* args)
{
    cout << "I am a new thread" << endl;

    pthread_mutex_lock(&mutex);
    cout << "I get a mutex!" << endl;

    pthread_mutex_lock(&mutex);
    cout << "I alive again" << endl;

    return nullptr;
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, threadRoutine, nullptr);

    sleep(3);

    cout << "main thread run begin" << endl;
    pthread_mutex_unlock(&mutex);
    cout << "main thread unlock..." << endl;
    sleep(3);

    return 0;
}

在这里插入图片描述

由运行结果我们就可以看出,说明一个线程申请一把锁,可以由另一个线程释放。


五、线程同步

1. 线程同步的理解

互斥锁存在的两种不合理的情况:

  • 一个线程频繁的申请到锁,别人无法申请到锁,导致别人饥饿的问题
  • 上述的抢票系统,修改一下,当票数为0时,并不会立即退出。而是等待票数的增加,在等待票数增加的过程中,线程会频繁的申请锁和释放锁。这样的情况会导致资源的浪费。

线程同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做线程同步。

当我们访问临界资源前,需要先做临界资源是否存在的检测,检测的本质也是访问临界资源。那么对临界资源的检测也一定要在加锁和解锁之间。常规的方法检测临界资源是否就绪,就注定了我们必须频繁地申请锁和释放锁。


2. 条件变量

想要解决线程频繁申请和释放锁的问题,需要做到以下两点:

  • 不要让线程在频繁的检测资源是否就绪,而是让线程在资源未就绪时进行等待。
  • 当资源就绪的时候,通知等待该资源的线程,让这些线程来进行资源的申请和访问。

达到以上两点要求就是条件变量,条件变量可以通过允许线程阻塞和等待另一个线程发送信号来弥补互斥锁的不足,所以互斥锁和条件变量通常是一起使用的。

条件变量是一种线程同步机制,用于在多线程环境下实现线程间的协调与通信。他在处理竞态条件和线程间的互斥等问题上具有重要作用。

? 条件变量初始化

// 初始化方式一:
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
// 初始化方式二:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

参数

  • cond:要初始化的条件变量
  • attr:NULL

? 条件变量销毁

int pthread_cond_destroy(pthread_cond_t *cond)

? 等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:

  • cond:要在这个条件变量上等待
  • mutex:互斥量

? 唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒全部的线程
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒该条件变量下等待的线程
#include <iostream>
#include <cstdio>
#include <string>
#include <pthread.h>
#include <unistd.h>
using namespace std;

const int num = 5;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* active(void* args)
{
    string name = static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);// pthread_cond_wait,调用的时候,会自动释放锁
        cout << name << "活动" << endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t tids[num];
    for(int i = 0; i < num; i++)
    {
        char* name = new char[32];
        snprintf(name, 32, "pthread-%d", i + 1);
        pthread_create(tids + i, nullptr, active, name);
    }

    sleep(3);

    while(true)
    {
        cout << "main thread wakeup other thread..." << endl;
        pthread_cond_broadcast(&cond);

        sleep(1);
    }

    for(int i = 0; i < num; i++)
    {
        pthread_join(tids[i], nullptr);
    }
    return 0;
}