线程同步 信号量

发布时间 2023-08-31 01:08:19作者: LiviaYu

信号量

比条件变量更加简单

如图,当前老王的信号量为2,老赵的信号量为4,老赵开进去一辆车,那么空闲的车位即老赵的信号量-1,老王同理
直到信号量为0时,开车被阻塞

信号量函数

定义和初始化

定义变量

#include<semaphore.h>
sem_t sem;
#include<semaphore.h>
//初始化信号量
int sem_init(sem_t* sem,int pshared,unsigned int value);
//资源释放
int sem_destroy(sem_t *sem);

参数:

  • sem 信号量变量地址
  • pshared:
    • 0:线程同步
    • 非0:进程同步
  • value:初始化当前信号量拥有的资源数(>=0),如果资源数为0,线程就会被阻塞

对于消费者生产者模型,两个信号量,一个用于生产者一个用于消费者

信号量的加减

// 参数 sem 就是 sem_init() 的第一个参数  
// 函数被调用sem中的资源就会被消耗1个, 资源数-1
int sem_wait(sem_t *sem);

当线程调用这个函数,并且sem中的资源数>0,线程不会阻塞,线程会占用sem中的一个资源,因此资源数-1,直到sem中的资源数减为0时,资源被耗尽,因此线程也就被阻塞了。

那么如何加回去呢

// 调用该函数给sem中的资源数+1
int sem_post(sem_t *sem);

对应参数会+1

// 参数 sem 就是 sem_init() 的第一个参数  
// 函数被调用sem中的资源就会被消耗1个, 资源数-1
int sem_trywait(sem_t *sem);

当线程调用这个函数,并且sem中的资源数>0,线程不会阻塞,线程会占用sem中的一个资源,因此资源数-1,直到sem中的资源数减为0时,资源被耗尽,但是线程不会被阻塞,直接返回错误号,因此可以在程序中添加判断分支,用于处理获取资源失败之后的情况。

// 表示的时间是从1971.1.1到某个时间点的时间, 总长度使用秒/纳秒表示
struct timespec {
	time_t tv_sec;      /* Seconds */
	long   tv_nsec;     /* Nanoseconds [0 .. 999999999] */
};
// 调用该函数线程获取sem中的一个资源,当资源数为0时,线程阻塞,在阻塞abs_timeout对应的时长之后,解除阻塞。
// abs_timeout: 阻塞的时间长度, 单位是s, 是从1970.1.1开始计算的
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

查看信号量个数

// 查看信号量 sem 中的整形数的当前值, 这个值会被写入到sval指针对应的内存中
// sval是一个传出参数
int sem_getvalue(sem_t *sem, int *sval);

通过这个函数可以查看sem中现在拥有的资源个数,通过第二个参数sval将数据传出,也就是说第二个参数的作用和返回值是一样的。

示例

场景描述:使用信号量实现生产者和消费者模型,生产者有5个,往链表头部添加节点,消费者也有5个,删除链表头部的节点。

情况1:只有一个资源

如果生产者和消费者线程使用的信号量对应的总资源数为1,那么不管线程有多少个,可以工作的线程只有一个,其余线程由于拿不到资源,都被迫阻塞了。

#include<pthread.h>
#include<string.h>
#include<iostream>
#include<stdlib.h>
#include<bits/stdc++.h>
#include<semaphore.h>

using namespace std;

sem_t p_sem;
sem_t c_sem;


//链表定义
struct Node
{
    int number;
    Node* next;
};
Node* head=NULL;

//生产者
void* produce(void* arg)
{
    while(1)
    {
        //检查生产者信号量
        sem_wait(&p_sem);
        //创建一个新的节点,临界区,需要锁
        Node* newnode=new Node();
        newnode->number=rand()%1000;
        newnode->next=head;
        head=newnode;
        cout<<"生产者:"<<pthread_self()<<" number: "<<newnode->number<<endl;
        //向消费者的信号量+1
        sem_post(&c_sem);
        sleep(rand()%3);
    }
    return NULL;
}
//消费者

void* consume(void* arg)
{
    while(1)
    {
        //检查消费者的信号量
        sem_wait(&c_sem);
        Node* node=head;
        cout<<"消费者:"<<pthread_self()<<" number: "<<node->number<<endl;
        head=head->next;
        delete node;
        //向生产者的信号量+1
        sem_post(&p_sem);
        sleep(rand()%3);
    }
    return NULL;
}

int main()
{
    //信号量
    //生产者,需要多少的大小就写多少
    sem_init(&p_sem,0,1);
    //消费者,显然一开始为0
    sem_init(&c_sem,0,0);
    //线程的创建
    pthread_t p[5],c[5];

    for(int i=0;i<5;i++)
    {
        pthread_create(&p[i],NULL,produce,NULL);
    }
    for(int i=0;i<5;i++)
    {
        pthread_create(&c[i],NULL,consume,NULL);
    }

    //
    for(int i=0;i<5;i++)
    {
        pthread_join(p[i],NULL);
        pthread_join(c[i],NULL);
    }

    //销毁
    sem_destroy(&p_sem);
    sem_destroy(&c_sem);
}

情况2:总资源数>1

此时,最多五个线程可以向链表之中同时添加结点,同时访问共享内存,会出bug
所以需要锁来保证一个线性执行
可以加互斥或者读写锁

加锁方式:外部

void* produce(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&m);
        //检查生产者信号量
        sem_wait(&p_sem);
        //创建一个新的节点,临界区,需要锁
        Node* newnode=new Node();
        newnode->number=rand()%1000;
        newnode->next=head;
        head=newnode;
        cout<<"生产者:"<<pthread_self()<<" number: "<<newnode->number<<endl;
        //向消费者的信号量+1
        sem_post(&c_sem);
        pthread_mutex_unlock(&m);
        sleep(rand()%3);
    }
    return NULL;
}
void* consume(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&m);
        //检查消费者的信号量
        sem_wait(&c_sem);
        Node* node=head;
        cout<<"消费者:"<<pthread_self()<<" number: "<<node->number<<endl;
        head=head->next;
        delete node;
        //向生产者的信号量+1
        sem_post(&p_sem);
        pthread_mutex_unlock(&m);
        sleep(rand()%3);
    }
    return NULL;
}

加在信号量外部

对于消费者线程,如果一开始抢到了锁,那么会给共享内存上锁,然后检查信号量的过程中,发现c_sem为0,也发生了阻塞
此时,mutex和semaphore同时阻塞,同时生产者不能生产,发生死锁

加锁方式:内部

完成代码

#include<pthread.h>
#include<string.h>
#include<iostream>
#include<stdlib.h>
#include<bits/stdc++.h>
#include<semaphore.h>

using namespace std;

sem_t p_sem;
sem_t c_sem;

//加入互斥锁,保证线程串行访问
pthread_mutex_t m;


//链表定义
struct Node
{
    int number;
    Node* next;
};
Node* head=NULL;

//生产者
void* produce(void* arg)
{
    while(1)
    {
        
        //检查生产者信号量
        sem_wait(&p_sem);
        //加入互斥锁,需要放入sem_wait内部
        pthread_mutex_lock(&m);
        //创建一个新的节点,临界区,需要锁
        Node* newnode=new Node();
        newnode->number=rand()%1000;
        newnode->next=head;
        head=newnode;
        cout<<"生产者:"<<pthread_self()<<" number: "<<newnode->number<<endl;
        //解锁
        pthread_mutex_unlock(&m);
        //向消费者的信号量+1
        sem_post(&c_sem);
        
        sleep(rand()%3);
    }
    return NULL;
}
//消费者

void* consume(void* arg)
{
    while(1)
    {
       
        //检查消费者的信号量
        sem_wait(&c_sem); 
        //
        pthread_mutex_lock(&m);
        Node* node=head;
        cout<<"消费者:"<<pthread_self()<<" number: "<<node->number<<endl;
        head=head->next;
        delete node;
        //
        pthread_mutex_unlock(&m);
        //向生产者的信号量+1
        sem_post(&p_sem);
        
        sleep(rand()%3);
    }
    return NULL;
}

int main()
{
    //锁
    pthread_mutex_init(&m,NULL);
    //信号量
    //生产者,需要多少的大小就写多少
    sem_init(&p_sem,0,5);
    //消费者,显然一开始为0
    sem_init(&c_sem,0,0);
    //线程的创建
    pthread_t p[5],c[5];

    for(int i=0;i<5;i++)
    {
        pthread_create(&p[i],NULL,produce,NULL);
    }
    for(int i=0;i<5;i++)
    {
        pthread_create(&c[i],NULL,consume,NULL);
    }

    //
    for(int i=0;i<5;i++)
    {
        pthread_join(p[i],NULL);
        pthread_join(c[i],NULL);
    }
    //销毁
    sem_destroy(&p_sem);
    sem_destroy(&c_sem);
    pthread_mutex_destroy(&m);
    
}