生产者与消费者模型

发布时间 2023-09-17 14:46:39作者: ⭐⭐-fighting⭐⭐
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>

#define BUFF_MAX 10
#define SC_NUM 2
#define XF_MAX 3

int buff[BUFF_MAX];
int in = 0;
int out = 0;

sem_t sem_empty;
sem_t sem_full;
pthread_mutex_t mutex;

void *sc_thread(void *arg)
{
    int index = (int)arg;
    for (int i = 0; i < 30; i++)
    {
        sem_wait(&sem_empty); // 是否有空闲位置
        pthread_mutex_lock(&mutex);
        buff[in] = rand() % 100;
        printf("第%d个线程,产生数据:%d,在%d位置\n", index, buff[in], in);
        in = (in + 1) % BUFF_MAX;
        pthread_mutex_unlock(&mutex); // 解锁
        sem_post(&sem_full);          // 数据的个数

        int n = rand() % 10;
        sleep(n);
    }
}

void *xf_thread(void *arg)
{
    int index = (int)arg;
    for (int i = 0; i < 20; i++)
    {
        sem_wait(&sem_full);
        pthread_mutex_lock(&mutex);
        printf("-----第%d个线程,消费数据:%d,在%d位置\n", index, buff[out], out);
        out = (out + 1) % BUFF_MAX;
        pthread_mutex_unlock(&mutex);
        sem_post(&sem_empty);

        int n = rand() % 10;
        sleep(n);
    }
}

int main()
{
    sem_init(&sem_empty, 0, BUFF_MAX);
    sem_init(&sem_full, 0, 0);
    pthread_mutex_init(&mutex, NULL);

    srand((int)time(NULL));

    pthread_t sc_id[SC_NUM];
    pthread_t xf_id[XF_NUM];

    for (int i = 0; i < SC_NUM; i++)
    {
        pthread_create(&sc_id[i], NULL, sc_thread, (void *)i); // 创建生产者
    }
    for (int i = 0; i < XF_NUM; i++)
    {
        pthread_create(&xf_id[i], NULL, xf_thread, NULL); // 创建消费者
    }

    for (int i = 0; i < SC_NUM; i++)
    {
        pthread_join(sc_id[i], NULL);
    }
    for (int i = 0; i < XF_NUM; i++)
    {
        pthread_join(xf_id[i], NULL);
    }

    sem_destroy(&sem_empty);
    sem_destroy(&sem_full);
    pthread_mutex_destroy(&mutex);

    printf("main over\n");
    exit(0);
}

这段代码实现了一个生产者-消费者模型,使用了线程和信号量来进行同步。这种模型用于描述两种类型的线程:生产者线程生成数据并将其放入一个共享缓冲区,而消费者线程从缓冲区中获取数据并进行处理。

以下是代码的主要流程解释:

  1. 包含头文件和定义常量: 代码开始包含了必要的头文件,并定义了一些常量,如缓冲区的最大大小 BUFF_MAX,生产者线程数量 SC_NUM,和消费者线程数量 XF_MAX

  2. 全局变量和同步对象初始化: 代码定义了一些全局变量,包括一个整数数组 buff(用于模拟共享缓冲区)、inout(用于记录生产者和消费者在缓冲区中的位置)。此外,代码还初始化了三个同步对象:sem_empty 用于追踪空闲缓冲区位置的信号量,sem_full 用于追踪填充缓冲区位置的信号量,以及 mutex 用于保护对共享资源的互斥访问。

  3. 生产者线程函数 sc_thread 这个函数模拟生产者的行为。每个生产者线程在一个循环中运行,它尝试获取一个空闲的位置,生成一个随机数,将该数放入共享缓冲区,并在完成后释放一个填充的位置。生产者线程也会随机休眠一段时间,以模拟生产的不确定性。

  4. 消费者线程函数 xf_thread 这个函数模拟消费者的行为。每个消费者线程也在一个循环中运行,它尝试获取一个填充的位置,从共享缓冲区中获取数据,处理数据,然后释放一个空闲的位置。和生产者线程一样,消费者线程也会随机休眠一段时间。

  5. 主函数 main 在主函数中,首先对信号量和互斥锁进行初始化。然后,创建了多个生产者线程和消费者线程,它们并发地执行 sc_threadxf_thread 函数。随后,主函数等待所有线程完成,最后销毁了信号量和互斥锁。

  6. 线程的执行: 在执行过程中,生产者线程不断生成数据并放入缓冲区,而消费者线程从缓冲区中取出数据进行处理。信号量 sem_emptysem_full 用于确保生产者和消费者之间的同步。互斥锁 mutex 用于保护对缓冲区的并发访问,确保一次只有一个线程可以修改缓冲区。

  7. 清理和结束: 在所有线程完成后,代码销毁了信号量和互斥锁,然后结束程序。

这段代码演示了如何使用信号量和互斥锁来解决生产者-消费者问题,确保多个线程能够正确地协同工作,避免了竞态条件和数据不一致性问题。