多线程编程同步:Posix信号量

发布时间 2023-10-20 14:16:42作者: eiSouthBoy

信号量的定义

IPC是进程间通信(interprocess communication)的简称。狭义上,IPC主要用于进程间;广义上,IPC可用于进程间或线程间。

Posix消息队列、Posix信号量和Posix共享内存区 合称为 “Posix IPC”.

信号量(semaphore)是一种用于提供不同进程间或一个给定进程的不同线程间同步手段。

信号量的分类:

1)Posix有名信号量

2)Posix基于内存的信号量,或者称 无名信号量

3)System V信号量

信号量的操作:

1)创建(create)一个信号量。创建信号量时,应给定初始值

2)等待(wait)一个信号量。该操作会测试这个信号量的值,如果其值小于或等于0,那就等待(阻塞),一旦其值变为大于0就将它减1。

3)挂出(post)一个信号量。该操作将信号量的值加1

? 注:信号量的值不是一定要为二值信号量。它们适用与其值初始化为任意非负值的信号量。这样的信号量称为 计数信号量(counting semaphore),它通常初始化为某个值N,表示可用的资源(例如 缓冲区)数。
二值信号量可用于互斥目的,就像互斥锁一样。信号量有一个互斥锁没有的特性:互斥锁必须总是由锁住它的线程解锁,但信号量的挂出却不必由执行过它的等待操作的同一线程执行。

生产者-消费者问题(有名信号量)

现在对生产者-消费者问题进行扩展,把共享缓冲区用作一个环形缓冲区:生产者填写最后一个(buff[N_BUFF - 1])后,重头开始填写第一个(buff[0]),消费者也同样这么做。这么做有一个问题:生产者不能走到消费者的前面。

prodcons_sem.c

/*
 * @Description: named semaphore achieve producer-consumer problem
 * @Author: 
 * @version: 
 * @Date: 2023-10-19 10:00:24
 * @LastEditors: 
 * @LastEditTime: 2023-10-19 11:00:11
 */

//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <semaphore.h>
#include <unistd.h>
#include <time.h>

//==============================================================================
// Constants
#define N_BUFF  10
#define PATH_MAX 1024
#define MAX_N_ITEMS 1000000

#define SEM_MUTEX       "mutex"    // these are args to px_ipc_name()
#define SEM_N_EMPTY     "n_empty"
#define SEM_N_STORED    "n_stored"

#define MAX(a,b) ((a)>(b) ? (a):(b)) 
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED
{
    int buff[N_BUFF]; // circulate shared buffer
    sem_t *mutex;     // binary semaphore
    sem_t *nEmpty;    // number of empty slot
    sem_t *nStored;   // number of stored slot
};

//==============================================================================
// global varibles

static int g_nItems = 0;  // read-only by producer and consumer
static struct SHARED g_shared;

//==============================================================================
// global functions

static void *produce(void *arg);
static void *consume(void *arg);

//==============================================================================
// The main entry-point function.

int main(int argc, char **argv)
{
    pthread_t tid_produce = 0;
    pthread_t tid_consume = 0;

    if (argc != 2)
    {
        printf("usage: %s <#items>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
    printf("the number of items is %d\n\n", g_nItems);
    /* create 3 semaphores and init  */
    sem_unlink(SEM_MUTEX);   // if the specified semaphore is exist, we must delete.
    sem_unlink(SEM_N_EMPTY);
    sem_unlink(SEM_N_STORED);

    g_shared.mutex = sem_open(SEM_MUTEX, O_CREAT | O_EXCL, 0666, 1);
    g_shared.nEmpty = sem_open(SEM_N_EMPTY, O_CREAT | O_EXCL, 0666, N_BUFF);
    g_shared.nStored = sem_open(SEM_N_STORED, O_CREAT | O_EXCL, 0666, 0);

    /* create 1 producer and 1 consumer */
    pthread_setconcurrency(2);
    pthread_create(&tid_produce, NULL, produce, NULL);
    pthread_create(&tid_consume, NULL, consume, NULL);
    /* wait for 2 threads */
    pthread_join(tid_produce, NULL);
    pthread_join(tid_consume, NULL);
    /* remove 3 semaphores */
    sem_unlink(SEM_MUTEX);
    sem_unlink(SEM_N_EMPTY);
    sem_unlink(SEM_N_STORED);

    exit(EXIT_SUCCESS);
}

static void *produce(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        sem_wait(g_shared.nEmpty); // wait for at least 1 empty slot
        sem_wait(g_shared.mutex);

        g_shared.buff[i % N_BUFF] = i;
        printf("produce %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);

        sem_post(g_shared.mutex);
        sem_post(g_shared.nStored); // 1 more stored item
    }
    return (NULL);
}

static void *consume(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        sem_wait(g_shared.nStored); // wait for at least 1 stored item
        sem_wait(g_shared.mutex);

        if (g_shared.buff[i % N_BUFF] != i)
        {
            printf("buff[%d] = %d\n", i, g_shared.buff[i % N_BUFF]);
        }
        else
        {
            printf("consume %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);
        }

        sem_post(g_shared.mutex);
        sem_post(g_shared.nEmpty); // 1 more empty slot
    }
    return (NULL);
}

生产者-消费者问题(无名信号量)

prodcons_sem_memory.c

/*
 * @Description: unnamed semaphore achieve producer-consumer problem
 * @Author: 
 * @version: 
 * @Date: 2023-10-20 10:00:24
 * @LastEditors: 
 * @LastEditTime: 2023-10-20 11:00:11
 */

//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <semaphore.h>
#include <unistd.h>
#include <time.h>

//==============================================================================
// Constants
#define N_BUFF  10
#define MAX_N_ITEMS 1000000

#define MAX(a,b) ((a)>(b) ? (a):(b)) 
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED
{
    int buff[N_BUFF]; // circulate shared buffer
    sem_t mutex;     // binary semaphore
    sem_t nEmpty;    // number of empty slot
    sem_t nStored;   // number of stored slot
};

//==============================================================================
// global varibles

static int g_nItems = 0;  // read-only by producer and consumer
static struct SHARED g_shared;

//==============================================================================
// global functions

static void *produce(void *arg);
static void *consume(void *arg);

//==============================================================================
// The main entry-point function.

int main(int argc, char **argv)
{
    pthread_t tid_produce = 0;
    pthread_t tid_consume = 0;

    if (argc != 2)
    {
        printf("usage: %s <#items>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
    printf("the number of items is %d\n\n", g_nItems);
    /* create 3 semaphores based memory and init it  */
    sem_init(&g_shared.mutex, 0, 1);
    sem_init(&g_shared.nEmpty, 0, N_BUFF);
    sem_init(&g_shared.nStored, 0, 0);

    /* create 1 producer and 1 consumer */
    pthread_setconcurrency(2);
    pthread_create(&tid_produce, NULL, produce, NULL);
    pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for 2 threads */
    pthread_join(tid_produce, NULL);
    pthread_join(tid_consume, NULL);

    /* destroy 3 semaphores */
    sem_destroy(&g_shared.mutex);
    sem_destroy(&g_shared.nEmpty);
    sem_destroy(&g_shared.nStored);

    exit(EXIT_SUCCESS);
}

static void *produce(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        sem_wait(&g_shared.nEmpty); // wait for at least 1 empty slot
        sem_wait(&g_shared.mutex);

        g_shared.buff[i % N_BUFF] = i;
        printf("produce %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);

        sem_post(&g_shared.mutex);
        sem_post(&g_shared.nStored); // 1 more stored item
    }
    return (NULL);
}

static void *consume(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        sem_wait(&g_shared.nStored); // wait for at least 1 stored item
        sem_wait(&g_shared.mutex);

        if (g_shared.buff[i % N_BUFF] != i)
        {
            printf("buff[%d] = %d\n", i, g_shared.buff[i % N_BUFF]);
        }
        else
        {
            printf("consume %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);
        }

        sem_post(&g_shared.mutex);
        sem_post(&g_shared.nEmpty); // 1 more empty slot
    }
    return (NULL);
}

参考引用

UNIX网络编程 卷2 进程间通信 第2版