Linux学习记录——이십오 多线程(2)
文章目录
1、理解原生线程库
线程库在物理内存中存在,也映射到了地址空间的共享区,那么每个线程就可以很方便地去实现自己的代码,库里也包括了线程切换,管理等代码。库对于线程的管理也是先描述再组织,它会创建类似管理进程的TCB,TCB管理LWP,也就是轻量级进程,用户把代码给TCB,LWP也会提供一些可用的东西给到TCB。
线程库中有动态库的一部分,以及各个线程的TCB,TCB里有线程局部存储,线程栈以及其他属性。要想找到一个TCB,只需要找到它的起始地址即可,而这个地址就是线程ID,是一个地址数据,所以这个数据就比较大。
这个线程ID是一个用户级线程ID,是一个虚拟地址。
每个线程都要有自己独立的栈结构,线程栈在TCB里。主线程用的是进程系统栈,新线程用的是库中提供的栈。
当用户用pthread_create创建进程时,会在库中创建描述线程的相关结构体,创建轻量级进程,创建TCB,里面包含线程在用户空间定义的各种东西,轻量级进程、地址以及用户让轻量级进程执行的方法传给系统,系统就会调度这个轻量级进程,然后运行。
C++也有多线程,C++里的线程实现接口也封装了系统的线程库,所以执行C++文件的makefile需要这样写
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
void run1()
{
while(true)
{
cout << "thread 1" << endl;
sleep(1);
}
}
void run2()
{
while(true)
{
cout << "thread 2" << endl;
sleep(1);
}
}
int main()
{
thread th1(run1);
thread th2(run2);
th1.join();
th2.join();
return 0;
}
线程局部存储
几个线程执行同一个方法,那么里面的局部变量是只有一个,几个线程轮番对它进行操作,还是每次调用方法每次的变量都是独一份的?实际上是独立的,每个线程都有一份局部变量,说明这些局部变量在线程栈上,这些变量的地址也都不一样。那么变量名和地址应当怎样看待?
函数在调用之前会被编译,声明变量的代码会被转换为汇编代码,在计算机内部,对于一个进程的栈会存储它的栈顶(esp)和栈底(ebp),开辟空间的时候,会让ebp减去一个变量类型占用的字节,减去之后所在的那个地址就是这个变量的地址。这也就是运行时开辟空间,换成特定的代码去创建变量。ebp和esp是CPU内部的寄存器,只要更改ebp和esp就会切换线程的栈。所以不同线程的开辟的变量的地址也就不同,ebp和esp一换就到了另一个栈,然后再开辟空间。创建变量也可以看作通过偏移量来确定怎么创建。
取地址的时候取的是最低的地址,因为ebp是减一个数字来开辟空间,所以低地址加上偏移量就是栈底地址。
那么全局变量呢?
全局变量多个线程共享,它开辟在地址空间的已初始化数据段。如果不想被所有线程共享,那就在前面加上__thread,这时候变量就存在线程局部存储,从已初始化数据段拷贝到局部存储。打印出来的时候会发现地址变得更大了,是因为线程的库在堆栈之间,已初始化数据段的地址要比堆的地址低。
2、互斥
多线程中大部分资源会被共享,所以一定存在并发访问的情况,也就是多个线程访问同一个资源。那么为了解决这个问题,一个线程访问一个资源,其他线程就不允许再访问,这也就是串行式访问,也就是互斥,这些资源就是临界性资源,访问临界资源的代码就是临界区。
看一下并发访问的场景。AB两个线程访问同一个变量gval,也就是共享资源。数据在内存,但是计算在CPU,所以执行流要把数据加载到寄存器中,然后进行计算操作,再把修改后的数据写会到内存中,也就是3步。假设gval == 100,线程A把变量放到寄存器,–,gval变成99后,这时候A的时间片到了,那么系统就会把它停止,A就会带着自己的上下文以及运算好的变量挂起了。
接着B再运行,B也做一样的动作,但是B认为gval还是100,因为A的返回动作并没有做,然后B计算好,再写回到内存中,假设AB都是while循环里去–,那么B回去后再次重复刚才的动作,把值为99的gval放到寄存器,再去计算;一直循环,假设gval被减到了10,然后又减到了9,返回之前像A一样被还没写回去就被迫停止。
B结束后又轮到了A,A会继续进行写回去的动作,所以会把原本为10的数据又改成了99。对全局变量做访问,没有保护的话,会存在并发访问的问题,进而导致数据不一致问题。
共享资源如果被保护起来,就叫临界资源。任何一个线程都有代码访问临界资源,这部分代码就叫做临界区,当然线程也有不访问临界资源的,这些部分就是非临界区。想让多个线程安全地访问临界资源,就有互斥访问,加锁等方式。
1、并发代码(抢票)
int tickets = 10000;
void* threadRoutine(void* name)
{
string tname = static_cast<const char*>(name);
while(true)
{
if(tickets > 0)
{
usleep(2000);//模拟抢票花费的时间, usleep的参数是微秒级别
cout << tname << "get a ticket: " << tickets-- << endl;
}
else break;
}
return nullptr;
}
int main()
{
pthread_t t[4];
int n = sizeof(t) / sizeof(t[0]);
for(int i = 0; i < n; ++i)
{
char* data = new char[64];
snprintf(data, 64, "thread-%d", i + 1);
pthread_create(t + i, nullptr, threadRoutine, data);
}
for(int i = 0; i < n; ++i)
{
pthread_join(t[i], nullptr);
}
return 0;
}
这样会最终变成负数。当票数还有1个时,有可能几个线程都可以进去,如果进去的线程还没开始操作那就会持续进入线程,进入的线程也有可能时间片到了,在tickets–这里,存在多个线程的话,那么最终就会出现负数,所以tickets > 0,和tickets–两个地方都是临界区。
2、锁
为了加锁,需要引入pthread.h这个头文件
初始化有两种方式,只要锁是全局的或者静态的,那么就可以按照最后一行那样,pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER,之后也不需要销毁;另一种就是局部锁,这就必须初始化+销毁。pthread_mutex_t就是互斥锁类型,先初始化锁,才能让锁可用,最后用完要销毁。
加解锁接口。如果没加上,那就阻塞等待,直到加上。
int tickets = 10000;
pthread_mutex_t mutex;//必须先申请一把锁<F7>
void* threadRoutine(void* name)
{
string tname = static_cast<const char*>(name);
while(true)
{
pthread_mutex_lock(&mutex);//所有线程都得遵守这个规则
if(tickets > 0)
{
usleep(2000);//模拟抢票花费的时间, usleep的参数是微秒级别
cout << tname << "get a ticket: " << tickets-- << endl;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
pthread_mutex_init(&mutex, nullptr);
pthread_t t[4];
int n = sizeof(t) / sizeof(t[0]);
for(int i = 0; i < n; ++i)
{
char* data = new char[64];
snprintf(data, 64, "thread-%d", i + 1);
pthread_create(t + i, nullptr, threadRoutine, data);
}
for(int i = 0; i < n; ++i)
{
pthread_join(t[i], nullptr);
}
pthread_mutex_destroy(&mutex);
return 0;
}
这样改后,速度会变慢,因为有加锁的过程。但会发现全部都是一个线程在抢票。因为一个线程在加锁,另外几个线程都在等待,抢锁的这个线程时间片没到,其他线程就一直等待,所以会出现只有一个线程在抢票。
改成带有类的代码
#include <iostream>
#include <unistd.h>
#include <string>
#include <cstdio>
#include <cstring>
#include <phread.h>
#include <thread>
#include <ctime>
using namespace std;
int tickets = 10000;
//pthread_mutex_t mutex;//必须先申请一把锁
class TData
{
public:
TData(const string& name, pthread_mutex_t* mutex):_name(name), _pmutex(mutex)
{}
~TData()
{}
public:
string _name;
pthread_mutex_t* _pmutex;
}
void* threadRoutine(void* args)
{
TData* td = static_cast<TData*>(args);
//string tname = static_cast<const char*>(name);
while(true)
{
pthread_mutex_lock(td->_pmutex);//所有线程都得遵守这个规则
if(tickets > 0)
{
usleep(2000);//模拟抢票花费的时间, usleep的参数是微秒级别
cout << td->_name << "get a ticket: " << tickets-- << endl;
pthread_mutex_unlock(td->_pmutex);
}
else
{
pthread_mutex_unlock(td->_pmutex);
break;
}
}
return nullptr;
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, nullptr);
pthread_t tids[4];
int n = sizeof(t) / sizeof(t[0]);
for(int i = 0; i < n; ++i)
{
char name[64];
snprintf(name, 64, "thread-%d", i + 1);
TData* td = new TData(name, &mutex);
pthread_create(tids + i, nullptr, threadRoutine, td);
}
for(int i = 0; i < n; ++i)
{
pthread_join(tids[i], nullptr);
}
pthread_mutex_destroy(&mutex);
return 0;
}
通过以上可以发现
凡是访问同一个临界资源的线程,都要进行加锁保护,而且必须加同一把锁,这是一个不能违反的规则
每一个线程访问临界区之前,得加锁,加锁本质是给临界区加锁 ,加锁后代码进行串行访问,加锁粒度尽量要细一些,不要让太多代码一块加锁,所以临界区附近不要有多余的代码,要不就会执行时间更长
加锁前所有线程要先看到同一把锁,所以锁本身就是一个公共资源,加锁和解锁本身是原子的,所以锁才能保障自身安全
临界区可以是一行代码,也可以是一批代码,那么在解锁前线程大概率被切换出去,以及在加锁的时候也可能会被切出去。但是没有影响,如果在临界区内加锁后被切换出去,其他进程也不能再次申请锁,进入临界区,只能等这个线程执行完后再申请锁,这也就是互斥带来的串行化的表现,也是为什么加锁后会变慢。站在其它线程角度,对它们有意义的状态就是锁被申请,锁被释放,所以锁的原子性就体现在这里,解锁也是原子的。
Linux把这个锁叫做互斥锁或者互斥量。
3、互斥锁的实现原理
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 看一下加/解锁的伪代码。
调用的线程会去执行lock和unlock。内存中给mutex分配空间,默认变量值是1。一个线程开始lock的时候,会先把0传给%al这个寄存器,其实就是初始化它为0。寄存器的硬件只有一套,但是寄存器内部的数据,也就是执行流的上下文是线程自己的。相当于私人物品放在公共地区。所以lock的第一句的意思就是调用线程,向自己的上下文写入0,之后切走时还要带走这个0。
第二句就是exchange指令,寄存器和锁的空间交换数据,本质就是将共享数据交换到自己的私有上下文当中,也就是1和寄存器中的0交换一下位置,这个操作就是加锁,这一条汇编语句对应着加锁的原子性。如果这时候还没进行下一步,线程被切走了,这时候这个线程就会把寄存器中的内容拿走,并且记录下这个线程执行到的代码,切回之后还会继续这行代码。但是这时候锁是0,下一个线程还是执行lock的代码,寄存器中初始化为0,然后和锁交换,进入判断,发现寄存器中为0,所以挂起等待,所以申请锁失败,这个线程就会带着上下文等去了,等到申请锁成功的线程回来,把1放进寄存器中,然后判断,return 0,加锁成功。所以1只会流转。
那么解锁也就能看出来了。
3、线程封装
1、线程本体
这里就直接传代码
#include <iostream>
#include <pthread.h>
#include <string>
#include <cstdlib>
using namespace std;
typedef void (*func_t)();
class Thread
{
public:
typdef enum
{
New = 0,
RUNNING,
EIXTED
}ThreadStatus;
typedef void (*func_t)(void*);
public:
Thread(int num, func_t func, void* args):_tid(0), _status(NEW), _func(func), _args(args)//num是线程编号
{
char name[128];
snprintf(name, sizeof(name), "thread-%d", num);
_name = name;
}
int status() {return _status;}
string threadname() {return _name;}
pthread_t threadid()
{
if(_status == RUNNING) return _tid;
else
{
return 0;
}
}
//runHelper是成员函数,类的成员函数具有默认参数this,也就是括号有一个Thread* this,pthread_create的参数应当是void*类型,就不符合,所以加上static,放在静态区,就没有this,但是又有新问题了
//static成员函数无法直接访问类内属性和其他成员函数,所以create那里要传this指针,this就是当前线程对象,传进来才能访问类内的东西
static void* runHelper(void* args)//args就是执行方法的参数
{
Thread* ts = (Thread*)args;//就拿到了当前对象
//函数里可以直接调用func这个传过来的方法,参数就是_args
(*ts)();//调用了func函数
return nullptr;
}
void operator()()//仿函数
{
if(_func != nullptr) _func(_args);
}
void run()//run函数这里传方法
{
int n = pthread_create(&_tid, nullptr, runHelper, this);//为什么传this?
if(n != 0) exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid, nullptr);
if(n != 0)
{
cerr << "main thread join thread" << _name << "error" << endl;
return ;
}
_status = EXITED;
}
~Thread()
{}
private:
pthread_t _tid;
string _name;
func_t func;//线程未来要执行的函数方法
void* _args;//也可以不写这个,那么typedef的函数指针就没有参数。当然,参数也可用模板来写
ThreadStatus _status;
}
那么原来的使用线程代码就可以这样写了
void threadRun(void* args)
{
string message = static_cast<const char*>(args);
while(true)
{
cout << "我是一个线程, " << message << endl;
sleep(1);
}
}
int main()
{
//弄两个线程
Thread t1(1, threadRun, (void*)"hellobit1");
Thread t2(2, threadRun, (void*)"hellobit2");
cout << "thread name: " << t1.threadname() << "thread id: " << t1.threadid() << ",thread status: " <<t1.status() << endl;
cout << "thread name: " << t1.threadname() << "thread id: " << t1.threadid() << ",thread status: " <<t1.status() << endl;
t1.run();
t2.run();
cout << "thread name: " << t1.threadname() << "thread id: " << t1.threadid() << ",thread status: " <<t1.status() << endl;
cout << "thread name: " << t1.threadname() << "thread id: " << t1.threadid() << ",thread status: " <<t1.status() << endl;
t1.join();
t2.join();
cout << "thread name: " << t1.threadname() << "thread id: " << t1.threadid() << ",thread status: " <<t1.status() << endl;
cout << "thread name: " << t1.threadname() << "thread id: " << t1.threadid() << ",thread status: " <<t1.status() << endl;
return 0;
}
2、封装锁
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex//自己不维护锁,由外部传入
{
public:
Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
{}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{}
private:
pthread_mutex_t *_pmutex;
}
class LockGuard//自己不维护锁,由外部传入
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
}
代码加锁
int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//全局初始化
void threadRoutine(void* args)
{
string *message = static_cast<const char*>(args);
while(true)
{
LockGuard lockguard(&mutex);//交给类去自动创建,析构锁//RAII风格加锁
if(tickets > 0)
{
usleep(2000);
cout << message << "get a ticket: " << tickets-- << endl;
}
else
{
break;
}
}
}
int main()
{
Thread t1(1, threadRoutine, (void*)"hellobit1");
Thread t2(2, threadRoutine, (void*)"hellobit2");
Thread t3(3, threadRoutine, (void*)"hellobit3");
Thread t4(4, threadRoutine, (void*)"hellobit4");
t1.run();
t2.run();
t3.run();
t4.run();
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
4、线程安全
常见不安全情况:
不保护共享变量的函数
函数状态随着被调用,状态发生变化的函数
返回指向静态变量指针的函数
调用线程不安全函数的函数
常见线程安全情况:
每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
部分类或者接口对于线程来说都是原子操作
多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况:
调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
不可重入函数体内使用了静态的数据结构
常见的可重入情况:
不使用全局变量或静态变量
不使用用malloc或者new开辟出的空间
不调用不可重入函数
不返回静态或全局数据,所有数据都有函数的调用者提供
使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系:
函数是可重入的,那就是线程安全的
函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题,可以通过加锁等方式来控制
如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别:
可重入函数是描述函数状态的一种
线程安全不一定是可重入的,而可重入函数则一定是线程安全的(仅仅是在调用过程中安全,如果在这之前之后调用全局变量等那就不一定了)
如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生
死锁,因此是不可重入的。
5、死锁
死锁,一种临界资源,被访问需要同时拥有两把锁,两个线程各自持有一把锁,并且互相申请另一把锁,导致两个线程都被挂起,这也就造成了死锁。
死锁的四个必要条件:
1、互斥条件:一个资源每次只能被一个执行流使用
2、请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
3、不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
4、环路条件(循环与等待):若干执行流之间形成一种头尾相接的循环等待资源的关系
也就是只要产生死锁,这四个条件必然都有了。
避免死锁的方法就是破坏死锁,破坏这四个条件的任何一个。
不加锁
主动释放锁(trylock函数是非阻塞式申请锁,申请失败就不会加锁,一般用在第一把锁已经用lock接口申请了,第二把锁就trylock一下,不行的话那就再自己操作,比如释放第一把锁)
多个线程申请锁的顺序保持一致
A线程申请锁,B线程可以去释放A的锁吗?加解锁可以在不同线程。可以控制线程统一释放锁。这一点可以看一下上面的加锁过程的伪代码,最后是mov,而是exchange这样的代码,说明什么,线程不必归还锁,系统自然有办法回收这个锁。
6、线程同步
线程在临界区内是可以自由切换,系统保护它不受影响。一个线程出了临界区,释放锁后,其它线程就可以来申请锁,但是如果还没等其它线程来申请,刚离开临界区的这个线程又一次申请锁,又占据了临界区,其它线程有不能申请了,只能等待,如果循环往复,这个线程不断地申请,释放锁,那么这就是毫无效率的一个线程,为了解决这个问题,系统就制定规则,刚释放锁的线程不能立刻再次申请锁,等待的线程要按照顺序排列好,出来的这个线程要排到队列尾部。像这样,在安全的规则下,多线程访问资源具有一定的顺序性,这就是线程同步。这是多线程协同工作的一种方式。
1、条件变量
用来实现多线程同步,一个线程可以通过条件变量来通知另一个线程要做的操作。在之前的抢票中,先加锁,然后去判断票数是否小于0,这个判断是临界资源,所以是在临界区内判断的,如果票数小于0,就会break退出,如果改一下,变成票数小于0就释放锁,然后再重复刚才的动作,申请锁,判断,同时系统会自动放票,所以需要这个程序不停地判断,这样就造成了低效程序,在这个线程不停地做这些动作时,其他线程无法执行其他操作。所以就需要用到条件变量来控制加锁解锁,访问临界资源的整个顺序性的执行
1、接口
条件变量的类型其实就是一个结构体。wait那个函数,mutex就是互斥锁,条件判断一定是在加锁之后的,如果不满足条件就调用这个接口,它会释放线程拥有的锁,然后让这个线程去等待,这也就是为什么wait接口有锁这个参数。signal可以唤醒一个等待的线程,broadcast可以唤醒所有等待的线程,broadcast也就是广播,这个在Python中比较熟悉。
2、demo代码
条件变量和锁的创建等都相似
const int num = 5;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_COND_INITIALIZER;
void* active(coid* args)
{
string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);//先默认一进入循环就已经是不符合判断了
cout << name << "活动" << endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t tids[num];
for(int i = 0; i < num; i++)
{
char* name = new char[32];
snprintf(name, 32, "thread-%d", i + 1);
pthread_create(tids + i, nullptr, active, name);
}
sleep(3);//经过上面的代码,所有线程都处于等待状态了,接下来全部唤醒
while(true)
{
//唤醒会从队列的第一个线程开始,唤醒后这个线程就会从被wait处开始,继续执行之后的代码,所以会打印活动
cout << "main thread wake up..." << endl;
pthread_cond_signal(&cond);
//pthread_cond_broadcast(&cond);会一次性打印5个线程,打印的顺序就是队列中的顺序,并且之后都会以这个顺序打印
sleep(1);//每隔一秒唤醒一个
}
for(int i = 0; i < num ; i++)
{
pthread_join(tids[i], nullptr);
}
}
开另一个窗口,用while :; do ps -aL | head -1&&ps -aL | grep threadtest; sleep 1; done来查看情况。threadtest是程序员自己命名的可执行程序的名字。
条件变量允许多线程在cond中队列式等待。
结束。