信号量的定义
IPC是进程间通信(interprocess communication)的简称。狭义上,IPC主要用于进程间;广义上,IPC可用于进程间或线程间。
Posix消息队列、Posix信号量和Posix共享内存区 合称为 “Posix IPC”.
信号量(semaphore)是一种用于提供不同进程间或一个给定进程的不同线程间同步手段。
信号量的分类:
1)Posix有名信号量
2)Posix基于内存的信号量,或者称 无名信号量
3)System V信号量
信号量的操作:
1)创建(create)一个信号量。创建信号量时,应给定初始值
2)等待(wait)一个信号量。该操作会测试这个信号量的值,如果其值小于或等于0,那就等待(阻塞),一旦其值变为大于0就将它减1。
3)挂出(post)一个信号量。该操作将信号量的值加1
? 注:信号量的值不是一定要为二值信号量。它们适用与其值初始化为任意非负值的信号量。这样的信号量称为 计数信号量(counting semaphore),它通常初始化为某个值N,表示可用的资源(例如 缓冲区)数。
二值信号量可用于互斥目的,就像互斥锁一样。信号量有一个互斥锁没有的特性:互斥锁必须总是由锁住它的线程解锁,但信号量的挂出却不必由执行过它的等待操作的同一线程执行。
生产者-消费者问题(有名信号量)
现在对生产者-消费者问题进行扩展,把共享缓冲区用作一个环形缓冲区:生产者填写最后一个(buff[N_BUFF - 1])后,重头开始填写第一个(buff[0]),消费者也同样这么做。这么做有一个问题:生产者不能走到消费者的前面。
prodcons_sem.c
/*
* @Description: named semaphore achieve producer-consumer problem
* @Author:
* @version:
* @Date: 2023-10-19 10:00:24
* @LastEditors:
* @LastEditTime: 2023-10-19 11:00:11
*/
//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <semaphore.h>
#include <unistd.h>
#include <time.h>
//==============================================================================
// Constants
#define N_BUFF 10
#define PATH_MAX 1024
#define MAX_N_ITEMS 1000000
#define SEM_MUTEX "mutex" // these are args to px_ipc_name()
#define SEM_N_EMPTY "n_empty"
#define SEM_N_STORED "n_stored"
#define MAX(a,b) ((a)>(b) ? (a):(b))
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED
{
int buff[N_BUFF]; // circulate shared buffer
sem_t *mutex; // binary semaphore
sem_t *nEmpty; // number of empty slot
sem_t *nStored; // number of stored slot
};
//==============================================================================
// global varibles
static int g_nItems = 0; // read-only by producer and consumer
static struct SHARED g_shared;
//==============================================================================
// global functions
static void *produce(void *arg);
static void *consume(void *arg);
//==============================================================================
// The main entry-point function.
int main(int argc, char **argv)
{
pthread_t tid_produce = 0;
pthread_t tid_consume = 0;
if (argc != 2)
{
printf("usage: %s <#items>\n", argv[0]);
exit(EXIT_FAILURE);
}
g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
printf("the number of items is %d\n\n", g_nItems);
/* create 3 semaphores and init */
sem_unlink(SEM_MUTEX); // if the specified semaphore is exist, we must delete.
sem_unlink(SEM_N_EMPTY);
sem_unlink(SEM_N_STORED);
g_shared.mutex = sem_open(SEM_MUTEX, O_CREAT | O_EXCL, 0666, 1);
g_shared.nEmpty = sem_open(SEM_N_EMPTY, O_CREAT | O_EXCL, 0666, N_BUFF);
g_shared.nStored = sem_open(SEM_N_STORED, O_CREAT | O_EXCL, 0666, 0);
/* create 1 producer and 1 consumer */
pthread_setconcurrency(2);
pthread_create(&tid_produce, NULL, produce, NULL);
pthread_create(&tid_consume, NULL, consume, NULL);
/* wait for 2 threads */
pthread_join(tid_produce, NULL);
pthread_join(tid_consume, NULL);
/* remove 3 semaphores */
sem_unlink(SEM_MUTEX);
sem_unlink(SEM_N_EMPTY);
sem_unlink(SEM_N_STORED);
exit(EXIT_SUCCESS);
}
static void *produce(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
sem_wait(g_shared.nEmpty); // wait for at least 1 empty slot
sem_wait(g_shared.mutex);
g_shared.buff[i % N_BUFF] = i;
printf("produce %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);
sem_post(g_shared.mutex);
sem_post(g_shared.nStored); // 1 more stored item
}
return (NULL);
}
static void *consume(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
sem_wait(g_shared.nStored); // wait for at least 1 stored item
sem_wait(g_shared.mutex);
if (g_shared.buff[i % N_BUFF] != i)
{
printf("buff[%d] = %d\n", i, g_shared.buff[i % N_BUFF]);
}
else
{
printf("consume %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);
}
sem_post(g_shared.mutex);
sem_post(g_shared.nEmpty); // 1 more empty slot
}
return (NULL);
}
生产者-消费者问题(无名信号量)
prodcons_sem_memory.c
/*
* @Description: unnamed semaphore achieve producer-consumer problem
* @Author:
* @version:
* @Date: 2023-10-20 10:00:24
* @LastEditors:
* @LastEditTime: 2023-10-20 11:00:11
*/
//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <semaphore.h>
#include <unistd.h>
#include <time.h>
//==============================================================================
// Constants
#define N_BUFF 10
#define MAX_N_ITEMS 1000000
#define MAX(a,b) ((a)>(b) ? (a):(b))
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED
{
int buff[N_BUFF]; // circulate shared buffer
sem_t mutex; // binary semaphore
sem_t nEmpty; // number of empty slot
sem_t nStored; // number of stored slot
};
//==============================================================================
// global varibles
static int g_nItems = 0; // read-only by producer and consumer
static struct SHARED g_shared;
//==============================================================================
// global functions
static void *produce(void *arg);
static void *consume(void *arg);
//==============================================================================
// The main entry-point function.
int main(int argc, char **argv)
{
pthread_t tid_produce = 0;
pthread_t tid_consume = 0;
if (argc != 2)
{
printf("usage: %s <#items>\n", argv[0]);
exit(EXIT_FAILURE);
}
g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
printf("the number of items is %d\n\n", g_nItems);
/* create 3 semaphores based memory and init it */
sem_init(&g_shared.mutex, 0, 1);
sem_init(&g_shared.nEmpty, 0, N_BUFF);
sem_init(&g_shared.nStored, 0, 0);
/* create 1 producer and 1 consumer */
pthread_setconcurrency(2);
pthread_create(&tid_produce, NULL, produce, NULL);
pthread_create(&tid_consume, NULL, consume, NULL);
/* wait for 2 threads */
pthread_join(tid_produce, NULL);
pthread_join(tid_consume, NULL);
/* destroy 3 semaphores */
sem_destroy(&g_shared.mutex);
sem_destroy(&g_shared.nEmpty);
sem_destroy(&g_shared.nStored);
exit(EXIT_SUCCESS);
}
static void *produce(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
sem_wait(&g_shared.nEmpty); // wait for at least 1 empty slot
sem_wait(&g_shared.mutex);
g_shared.buff[i % N_BUFF] = i;
printf("produce %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);
sem_post(&g_shared.mutex);
sem_post(&g_shared.nStored); // 1 more stored item
}
return (NULL);
}
static void *consume(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
sem_wait(&g_shared.nStored); // wait for at least 1 stored item
sem_wait(&g_shared.mutex);
if (g_shared.buff[i % N_BUFF] != i)
{
printf("buff[%d] = %d\n", i, g_shared.buff[i % N_BUFF]);
}
else
{
printf("consume %ld | buff[%d] = %d\n", pthread_self(), i, g_shared.buff[i % N_BUFF]);
}
sem_post(&g_shared.mutex);
sem_post(&g_shared.nEmpty); // 1 more empty slot
}
return (NULL);
}
参考引用
UNIX网络编程 卷2 进程间通信 第2版