Thread源码分析

发布时间 2023-05-26 00:31:22作者: Stitches

一些新知识点

__builtin_expect

https://blog.csdn.net/sinat_31608641/article/details/120692775

Linux 内核、Glib等会经常看到 likely()、unlikely() 两个宏,它们都使用了GCC内建函数 __builtin_expect(x,y)

#define likely(x)      __builtin_expect(!!(x), 1)
#define unlikely(x)    __builtin_expect(!!(x), 0)

该函数原型为 long __builtin_expect (long exp, long c),解释为你期望 exp 表达式的值等于常量c,看c的值,如果c的值为0,那么执行if分支的可能性很小,否则执行if分支的可能性很大。

看几个示例:

  • 由于期望 x == 0, 所以执行func()的可能性小,编译器优化直接执行else
if (__builtin_expect(x, 0))
{
    func();
}else{ 
 //do someting
}
  • 期望 ptr !=NULL这个条件成立(1), 所以执行func()的可能性小
if (__builtin_expect(ptr != NULL, 1))
{   
    //do something
}
else{ 
   func();
}

通常将 __builtin_expect 指令封装使用,!!(x)表示把(x)转化为布尔值原值(0/1),!(x)得到 true 或者 false。

#define likely(x)      __builtin_expect(!!(x), 1)
#define unlikely(x)    __builtin_expect(!!(x), 0)

函数作用:

现在处理器都是流水线的,系统可以提前取多条指令进行并行处理,但遇到跳转时,则需要重新取指令,跳转指令打乱了CPU流水线。因此,跳转次数少的程序拥有更高的执行效率。

在C语言编程时,会不可避免地使用if-else分支语句,if else 句型编译后, 一个分支的汇编代码紧随前面的代码,而另一个分支的汇编代码需要使用JMP指令才能访问到。很明显通过JMP访问需要更多的时间, 在复杂的程序中,有很多的if else句型,又或者是一个有if else句型的库函数,每秒钟被调用几万次,通常程序员在分支预测方面做得很糟糕, 编译器又不能精准的预测每一个分支,这时JMP产生的时间浪费就会很大。

因此,引入__builtin_expect函数来增加条件分支预测的准确性,cpu 会提前装载后面的指令,遇到条件转移指令时会提前预测并装载某个分支的指令。编译器会产生相应的代码来优化 cpu 执行效率。GCC在编译过程中,会将可能性更大的代码紧跟着前面的代码,从而减少指令跳转带来的性能上的下降, 达到优化程序的目的。


__thread 关键字:

__thread是GCC内置的线程局部存储设施,存取效率可以和全局变量相比。__thread变量每一个线程有一份独立实体,各个线程的值互不干扰。可以用来修饰那些带有全局性且值可能变,但是又不值得用全局变量保护的变量。

__thread使用规则:只能修饰POD类型(类似整型指针的标量,不带自定义的构造、拷贝、赋值、析构的类型,二进制内容可以任意复制memset,memcpy,且内容可以复原),不能修饰class类型,因为无法自动调用构造函数和析构函数,可以用于修饰全局变量,函数内的静态变量不能修饰函数的局部变量或者class的普通成员变量,且__thread变量值只能初始化为编译器常量(值在编译器就可以确定const int i=5,运行期常量是运行初始化后不再改变const int i=rand()).

测试实例:

#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
const int i=5;
__thread int var=i;//两种方式效果一样
//__thread int var=5;//
void* worker1(void* arg);
void* worker2(void* arg);
int main(){
    pthread_t pid1,pid2;
    //__thread int temp=5;
    static __thread  int temp=10;//修饰函数内的static变量
    pthread_create(&pid1,NULL,worker1,NULL);
    pthread_create(&pid2,NULL,worker2,NULL);
    pthread_join(pid1,NULL);
    pthread_join(pid2,NULL);
    cout<<temp<<endl;//输出10
    return 0;
}
void* worker1(void* arg){
    cout<<++var<<endl;//输出 6
}
void* worker2(void* arg){
    sleep(1);//等待线程1改变var值,验证是否影响线程2
    cout<<++var<<endl;//输出6
}


//output
6
6
10

多线程私有数据 pthread_key_create

关于并发编程中 static 变量的问题:https://developer.aliyun.com/article/321877

介绍:

在多线程的环境下,进程内的所有线程共享进程的数据空间。因此全局变量为所有线程共享。在程序设计中有时需要保存线程自己的全局变量,这种特殊的变量仅在线程内部有效。

如常见的errno,它返回标准的错误码。errno不应该是一个局部变量。几乎每个函数都应该可以访问他,但他又不能作为是一个全局变量。否则在一个线程里输出的很可能是另一个线程的出错信息,这个问题可以通过创建线程的私有数据(TSD thread specific data)来解决。在线程内部,私有数据可以被各个函数访问。但他对其他线程是屏蔽的。

线程私有数据采用了一键多值的技术,即一个键对应多个值。访问数据时都是通过键值来访问,好像是对一个变量进行访问,其实是在访问不同的数据。


pthread_key_create

函数原型为:int pthread_key_create(pthread_key_t *key, void (*destructor)(void*)),第一个参数为指向一个键值的指针;第二个参数指明了一个destructor的函数,如果这个参数不为空,那么当每个线程结束时,系统将会调用这个函数来释放绑定在这个键上的内存块。

key一旦被创建,所有线程都可以访问它,但是各线程根据自己的需要往key中填入不同的值,这就相当于提供了一个同名而不同值的全局变量,一键多值。
一键多值靠的是一个关键数据结构数组即TSD池,创建一个TSD就相当于将结构数组中的某一项设置为“in_use”,并将其索引返回给*key,然后设置清理函数。具体流程如下:

  1. 创建一个键
  2. 为一个键设置线程私有数据
  3. 从一个键读取线程私有数据void *pthread_getspecific(pthread_key_t key);
  4. 线程退出(退出时,会调用destructor释放分配的缓存,参数是key所关联的数据)
  5. 删除一个键

pthread_setspecific

函数原型为:int pthread_setspecific(pthread_key_t key,const void *pointer))

另外一个函数常常结合使用:void *pthread_getspecific(pthread_key_t key)

set是把一个变量的地址告诉key,一般放在变量定义之后,get会把这个地址读出来,然后你自己转义成相应的类型再去操作,注意变量的有效期。
只不过,在不同的线程里可以操作同一个key,他们不会冲突,比如线程a,b,c set同样的key,分别get得到的地址会是之前各自传进去的值。
这样做的意义在于,可以写一份线程代码,通过key的方式多线程操作不同的数据。


pthread_setspecific

函数原型:int pthread_setspecific(pthread_key_t key, const void *value),该函数将value的值(不是内容)与key相关联。用pthread_setspecific为一个键指定新的线程数据时,线程必须先释放原有的线程数据用以回收空间。


pthread_key_delete

函数原型:int pthread_key_delete(pthread_key_t key),用来删除一个键,删除后,键所占用的内存将被释放。注销一个TSD,这个函数并不检查当前是否有线程正使用该TSD,也不会调用清理函数(destr_function),
而只是将TSD释放以供下一次调用pthread_key_create()使用。需要注意的是,键占用的内存被释放。与该键关联的线程数据所占用的内存并不被释放。因此,线程数据的释放,必须在释放键之前完成。


测试实例:

#include <pthread.h>
#include <stdio.h>

using namespace std;

pthread_key_t key;
pthread_t thid1;
pthread_t thid2;

void* thread2(void* arg)
{
    printf("thread:%lu is running\n", pthread_self());
    
    int key_va = 3 ;

    pthread_setspecific(key, (void*)key_va);
    
    printf("thread:%lu return %d\n", pthread_self(), (int)pthread_getspecific(key));
}


void* thread1(void* arg)
{
    printf("thread:%lu is running\n", pthread_self());
    
    int key_va = 5;
    
    pthread_setspecific(key, (void*)key_va);

    pthread_create(&thid2, NULL, thread2, NULL);

    printf("thread:%lu return %d\n", pthread_self(), (int)pthread_getspecific(key));
}


int main()
{
    printf("main thread:%lu is running\n", pthread_self());

    pthread_key_create(&key, NULL);

    pthread_create(&thid1, NULL, thread1, NULL);

    pthread_join(thid1, NULL);
    pthread_join(thid2, NULL);

    int key_va = 1;
    pthread_setspecific(key, (void*)key_va);
    
    printf("thread:%lu return %d\n", pthread_self(), (int)pthread_getspecific(key));

    pthread_key_delete(key);
        
    printf("main thread exit\n");
    return 0;
}
/*三个线程:主线程,th1,th2各自有自己的私有数据区域
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

using namespace std;

static pthread_key_t str_key;
//define a static variable that only be allocated once
static pthread_once_t str_alloc_key_once=PTHREAD_ONCE_INIT;
static void str_alloc_key();
static void str_alloc_destroy_accu(void* accu);

char* str_accumulate(const char* s)
{    char* accu;
    
    pthread_once(&str_alloc_key_once,str_alloc_key);//str_alloc_key()这个函数只调用一次
    accu=(char*)pthread_getspecific(str_key);//取得该线程对应的关键字所关联的私有数据空间首址
    if(accu==NULL)//每个新刚创建的线程这个值一定是NULL(没有指向任何已分配的数据空间)
    {    accu=malloc(1024);//用上面取得的值指向新分配的空间
        if(accu==NULL)    return NULL;
        accu[0]=0;//为后面strcat()作准备
      
        pthread_setspecific(str_key,(void*)accu);//设置该线程对应的关键字关联的私有数据空间
        printf("Thread %lx: allocating buffer at %p\n",pthread_self(),accu);
     }
     strcat(accu,s);
     return accu;
}
//设置私有数据空间的释放内存函数
static void str_alloc_key()
{    pthread_key_create(&str_key,str_alloc_destroy_accu);/*创建关键字及其对应的内存释放函数,当进程创建关键字后,这个关键字是NULL。之后每创建一个线程os都会分给一个对应的关键字,关键字关联线程私有数据空间首址,初始化时是NULL*/
    printf("Thread %lx: allocated key %d\n",pthread_self(),str_key);
}
/*线程退出时释放私有数据空间,注意主线程必须调用pthread_exit()(调用exit()不行)才能执行该函数释放accu指向的空间*/
static void str_alloc_destroy_accu(void* accu)
{    printf("Thread %lx: freeing buffer at %p\n",pthread_self(),accu);
    free(accu);
}
//线程入口函数
void* process(void *arg)
{    char* res;
    res=str_accumulate("Resule of ");
    if(strcmp((char*)arg,"first")==0)
        sleep(3);
    res=str_accumulate((char*)arg);
    res=str_accumulate(" thread");
    printf("Thread %lx: \"%s\"\n",pthread_self(),res);
    return NULL;
}
//主线程函数
int main(int argc,char* argv[])
{    char* res;
    pthread_t th1,th2;
    res=str_accumulate("Result of ");
    pthread_create(&th1,NULL,process,(void*)"first");
    pthread_create(&th2,NULL,process,(void*)"second");
    res=str_accumulate("initial thread");
    printf("Thread %lx: \"%s\"\n",pthread_self(),res);
    pthread_join(th1,NULL);
    pthread_join(th2,NULL);
    pthread_exit(0);
}

线程函数 pthread_atfork

我们知道Linux在执行 fork() 产生一个子进程时,如果父进程创建了pthread的互斥锁(pthread_mutex_t)对象,那么子进程将自动继承父进程中互斥锁对象,并且互斥锁的状态也会被子进程继承下来:如果父进程中已经加锁的互斥锁在子进程中也是被锁住的,如果在父进程中未加锁的互斥锁在子进程中也是未加锁的。在父进程调用fork之前所创建的 pthread_mutex_t 对象会在子进程中继续有效,而 pthread_mutex_t 对象通常是全局对象,会在父进程的任意线程中被操作(加锁或者解锁),这样就无法通过简单的方法让子进程明确知道被继承的 pthread_mutex_t 对象到底有没有处于加锁状态。

参考APUE:子进程获得父进程数据空间、堆、栈的副本,这些副本父子进程并不共享存储空间,父子进程只共享正文段。但是由于在 fork 后往往跟随 exec,所以现在很多实现并不执行一个父进程数据段、堆栈的完全副本。作为替代,使用了写时复制(COPY-ON-WRITE)技术,这些区域由父子进程共享,而且内核将它们的访问权限改变为只读。 也就是说,子进程复制的是父进程的虚拟地址空间,如果父子进程都对某个变量只读,变量不变时表现为共享,此时对应的物理空间只有一份;如果父子进程需要修改某个变量,那么进程会对物理内存进行复制,这个时候变量是独立的,也就是说物理内存中存在两份空间。

pthread_atfork函数的原型为:int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));

该函数通过3个不同阶段的回调函数来处理互斥锁状态。参数如下:

  1. prepare:将在fork调用创建出子进程之前被执行,它可以给父进程中的互斥锁对象明明确确上锁。这个函数是在父进程的上下文中执行的,正常使用时,我们应该在此回调函数调用 pthread_mutex_lock 来给互斥锁明明确确加锁,这个时候如果父进程中的某个线程已经调用 pthread_mutex_lock 给互斥锁加上了锁,则在此回调中调用 pthread_mutex_lock 将迫使父进程中调用fork的线程处于阻塞状态,直到prepare能给互斥锁对象加锁为止。

  2. parent:是在fork调用创建出子进程之后,而fork返回之前执行,在父进程上下文中被执行。它的作用是释放所有在prepare函数中被明明确确锁住的互斥锁。

  3. child:是在fork返回之前,在子进程上下文中被执行。和parent处理函数一样,child函数也是用于释放所有在prepare函数中被明明确确锁住的互斥锁。

所以在 fork 前使用这个函数能够保证子进程继承 pthread_mutex_t 对象处在未加锁状态。

举个例子:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
 
#define ERROR(err, msg) do { errno = err; perror(msg); exit(-1); } while(0)
 
int count = 0;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
void prepare() {
        int err;
        printf("prepare: pthread_mutex_lock ...\n");
        err = pthread_mutex_lock(&lock);
        if (err != 0) ERROR(err, "prepare: pthread_mutex_lock failed");
        printf("prepare: lock start...\n");
}
 
void parent() {
        int err;
        printf("parent: pthread_mutex_unlock ...\n");
        err = pthread_mutex_unlock(&lock);
        if (err != 0) ERROR(err, "parent: pthread_mutex_unlock");
}
 
void child() {
        int err;
        printf("child: pthread_mutex_unlock ...\n");
        err = pthread_mutex_unlock(&lock);
        if (err != 0) ERROR(err, "child: pthread_mutex_unlock");
}
 
void* thread_proc(void* arg) {
        while(1) {
                pthread_mutex_lock(&lock);
                count++;
                printf("parent thread:  count:%d\n",count);
                sleep(10);
                pthread_mutex_unlock(&lock);
                sleep(1);
        }
        return NULL;
}
 
int main(int argc,char * argv[])
{
        int err;
        pid_t pid;
        pthread_t tid;
        pthread_create(&tid, NULL, thread_proc, NULL);
        err = pthread_atfork(prepare, parent, child);
        if (err != 0) ERROR(err, "atfork");
 
        sleep(1);
        printf("parent is about to fork ...\n");
        pid = fork();
        if (pid < 0) ERROR(errno, "fork");
        else if (pid == 0) {
                // child process
 
                int status;
                printf("child running\n");
                while(1) {
                        pthread_mutex_lock(&lock);
                        count ++;
                        printf("child: count:%d\n",count);
                        sleep(2);
                        pthread_mutex_unlock(&lock);
                        sleep(1);
                }
                exit(0);
        }
 
        pthread_join(tid, NULL);
 
        return 0;
}

//output
parent thread:  count:1
parent is about to fork ...
prepare: pthread_mutex_lock ...
prepare: lock start...
parent: pthread_mutex_unlock ...
child: pthread_mutex_unlock ...
child running
child: count:2
parent thread:  count:2
child: count:3
child: count:4
child: count:5
parent thread:  count:3
child: count:6

Thread源码

//Thread.h

class Thread : nocopytable
{
    public:
        typedef std::function<void()> ThreadFunc;
        explicit Thread(ThreadFunc, const string& name = string());
        ~Thread();

        void start();
        int join();
        bool started() const {return started_;}
        pid_t tid() const {return tid_;}
        const string& name() const { return name_; }

        static int numCreated() { return numCreated_.get(); }
    
    private:
        void setDefaultName();
        bool started_;              //线程是否开始
        bool joined_;               //状态是否 join
        pthread_t pthreadId_;
        pid_t tid_;
        ThreadFunc func_;
        string name_;
        CountDownLatch latch_;

        static AtomicInt32 numCreated_;
};
``


```c++
//Thread.c