进程间通信:共享内存区

发布时间 2023-12-15 18:08:16作者: eiSouthBoy

一、共享内存区

所谓共享内存区,即程序通过固定大小的物理存储链接到本地内存中,这种IPC形式是最快的。管道、FIFO和消息队列的问题在于,两个进程要交换信息时,这些信息必须由内核传递。

共享内存区示意图:

共享内存区的限制:

二、Posix 共享内存区

基于Posix 共享内存区的生产者--消费者实现。

案例示意图:

? 注意:shm_open()输入的文件名以/开头,后面紧接文件名,不能再出现/
创建共享内存区后,可以通过命令:ll /dev/shm 查看有哪些,其中 shm.serv 就是用户创建的。

服务器
shm_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <signal.h>
#include <getopt.h>
#include <unistd.h>
#include <sys/types.h>

#include <semaphore.h>

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>

#define MESG_SIZE   256 /* max #bytes per message, incl. null at end */
#define N_MESG      15  /* max #messages */

struct shmstruct
{
    sem_t   mutex;                          /* the Posix memory-based semaphores */
    sem_t   n_empty;
    sem_t   n_stored;
    int     n_put;                          /* index into msg_off[] for next put */
    long    n_overflow;                     /* #overflows by senders */
    sem_t   n_overflow_mutex;               /* mutex for n_overflow counter */
    long    msg_off[N_MESG];                /* offset in shared memory of each message */
    char    msg_data[N_MESG * MESG_SIZE];   /* the actual messages */
};

const char *SHM_SERV = NULL;

/* signal handler for catching ctrl+c */
void sigint(int signum)
{
    printf("\nserver terminated by user.\n");
    /* 删除一个名字不会影响对于底层支撑对象的现有调用,直到对于该对象的引用全部关闭为止 */
    shm_unlink(SHM_SERV);
    exit(EXIT_SUCCESS);
}

int main(int argc, char **argv)
{
    int fd, index, last_n_overflow, temp;
    long offset;
    struct shmstruct *ptr;

    if (argc != 2)
    {
        printf("usage: %s <name>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    /* create shm, set its size, map it, close descriptor */
   
    SHM_SERV = argv[1];
    if ((fd = shm_open(argv[1], O_RDWR | O_CREAT | O_EXCL, 0666)) < 0)
    {
        perror("shm_open()");
        exit(EXIT_FAILURE);
    }
    ptr = mmap(NULL, sizeof(struct shmstruct), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    ftruncate(fd, sizeof(struct shmstruct));
    close(fd);

    /* initialize the array of offsets */
    for (index = 0; index < N_MESG; index++)
    {
        ptr->msg_off[index] = index * MESG_SIZE;
    }

    /* initialize the semaphores int shared memory */
    sem_init(&ptr->mutex, 1, 1);
    sem_init(&ptr->n_empty, 1, N_MESG);
    sem_init(&ptr->n_stored, 1, 0);
    sem_init(&ptr->n_overflow_mutex, 1, 1);

    /* consumer */
    index = 0;
    last_n_overflow = 0;
    signal(SIGINT, sigint); // catch ctrl+c
    while (1)
    {
        sem_wait(&ptr->n_stored);
        sem_wait(&ptr->mutex);
        offset = ptr->msg_off[index];
        printf("index = %d: %s\n", index, &ptr->msg_data[offset]);
        if (++index >= N_MESG)
        {
            index = 0; /* circular buffer */
        }
        sem_post(&ptr->mutex);
        sem_post(&ptr->n_empty);

        sem_wait(&ptr->n_overflow_mutex);
        temp = ptr->n_overflow;
        sem_post(&ptr->n_overflow_mutex);

        if (temp != last_n_overflow)
        {
            printf("n_overflow = %d\n", temp);
            last_n_overflow = temp;
        }
    }

    exit(0);
}

客户端
shm_client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <errno.h>
#include <getopt.h>
#include <unistd.h>
#include <sys/types.h>

#include <semaphore.h>

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>

#define MESG_SIZE   256 /* max #bytes per message, incl. null at end */
#define N_MESG      15  /* max #messages */

struct shmstruct
{
    sem_t   mutex;                          /* the Posix memory-based semaphores */
    sem_t   n_empty;
    sem_t   n_stored;
    int     n_put;                          /* index into msg_off[] for next put */
    long    n_overflow;                     /* #overflows by senders */
    sem_t   n_overflow_mutex;               /* mutex for n_overflow counter */
    long    msg_off[N_MESG];                /* offset in shared memory of each message */
    char    msg_data[N_MESG * MESG_SIZE];   /* the actual messages */
};

int main(int argc, char **argv)
{
    int     fd, i, nloop, nusec;
    pid_t   pid;
    char    mesg[MESG_SIZE];
    long    offset;
    struct shmstruct *ptr;

    if (argc != 4)
    {
        printf("usage: %s <name> <#loops> <#usec>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    nloop = atoi(argv[2]);
    nusec = atoi(argv[3]);
    /* open and map shared memory that server must create */
    if ((fd = shm_open(argv[1], O_RDWR, 0666)) < 0)
    {
        perror("shm_open()");
        exit(EXIT_FAILURE);
    }
    ptr = mmap(NULL, sizeof(struct shmstruct), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    close(fd);

    /* producer */
    pid = getpid();
    for (i = 0; i < nloop; i++)
    {
        usleep(nusec);
        snprintf(mesg, MESG_SIZE, "pid %ld: message %d", (long)pid, i);

        if (sem_trywait(&ptr->n_empty) == -1)
        {
            if (errno == EAGAIN)
            {
                sem_wait(&ptr->n_overflow_mutex);
                ptr->n_overflow++;
                sem_post(&ptr->n_overflow_mutex);
                continue;;
            }
            else
            {
                printf("err_trywait error\n");
                exit(EXIT_FAILURE);
            }
        }

        sem_wait(&ptr->mutex);
        offset = ptr->msg_off[ptr->n_put];
        if (++(ptr->n_put) >= N_MESG)
        {
            ptr->n_put = 0; /* circular buffer */
        }
        sem_post(&ptr->mutex);

        strcpy(&ptr->msg_data[offset], mesg);
        sem_post(&ptr->n_stored);
    }
   
    exit(0);
}

运行测试:

三、System V 共享内存区

System V共享内存区和Posix 共享内存区的使用区别不大。

基于System V 共享内存区的生产者--消费者实现。

服务器
shm_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>

#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <errno.h>
#include <semaphore.h>

#include <sys/ipc.h>
#include <sys/shm.h> // system v共享内存区头文件

#define SHM_SERV_KET 1214L

#define MESG_SIZE   256 /* max #bytes per message, incl. null at end */
#define N_MESG      10  /* max #messages */

struct shmstruct
{
    sem_t   mutex;                          /* the Posix memory-based semaphores */
    sem_t   n_empty;
    sem_t   n_stored;
    int     n_put;                          /* index into msg_off[] for next put */
    long    n_overflow;                     /* #overflows by senders */
    sem_t   n_overflow_mutex;               /* mutex for n_overflow counter */
    long    msg_off[N_MESG];                /* offset in shared memory of each message */
    char    msg_data[N_MESG * MESG_SIZE];   /* the actual messages */
};

static int shm_id;

/* signal handler for catching ctrl+c */
void sigint(int signum)
{
    printf("\nserver terminated by user.\n");
    if (shm_id >= 0)
    {
        shmctl(shm_id, IPC_RMID, NULL);
    }
        
    exit(EXIT_SUCCESS);
}

int main(int argc, char **argv)
{
    int index, last_n_overflow, temp;
    long offset;
    struct shmstruct *ptr = NULL;
    struct shmid_ds attr;

    /* create shm */
    if ((shm_id = shmget(SHM_SERV_KET, sizeof(struct shmstruct), 0666 | IPC_CREAT)) < 0)
    {
        perror("shmget()");
        exit(EXIT_FAILURE);
    }
    if((ptr = shmat(shm_id, NULL, 0)) == NULL)
    {
        printf("shmat(): error\n");
        exit(EXIT_FAILURE);
    }
    
    /* initialize the array of offsets */
    for (index = 0; index < N_MESG; index++)
    {
        ptr->msg_off[index] = index * MESG_SIZE;
    }
    /* initialize the semaphores int shared memory */
    sem_init(&ptr->mutex, 1, 1);
    sem_init(&ptr->n_empty, 1, N_MESG);
    sem_init(&ptr->n_stored, 1, 0);
    sem_init(&ptr->n_overflow_mutex, 1, 1);

    /* consumer */
    index = 0;
    last_n_overflow = 0;
    signal(SIGINT, sigint); // catch ctrl+c
    while (1)
    {
        sem_wait(&ptr->n_stored);
        sem_wait(&ptr->mutex);
        offset = ptr->msg_off[index];
        printf("index = %d: %s\n", index, &ptr->msg_data[offset]);
        if (++index >= N_MESG)
        {
            index = 0; /* circular buffer */
        }
        sem_post(&ptr->mutex);
        sem_post(&ptr->n_empty);

        sem_wait(&ptr->n_overflow_mutex);
        temp = ptr->n_overflow;
        sem_post(&ptr->n_overflow_mutex);

        if (temp != last_n_overflow)
        {
            printf("n_overflow = %d\n", temp);
            last_n_overflow = temp;
        }
    }


    exit(EXIT_FAILURE);
}

客户端
shm_client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>

#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <errno.h>
#include <semaphore.h>

#include <sys/ipc.h>
#include <sys/shm.h> // system v共享内存区头文件

#define SHM_SERV_KET 1214L

#define MESG_SIZE   256 /* max #bytes per message, incl. null at end */
#define N_MESG      10  /* max #messages */

struct shmstruct
{
    sem_t   mutex;                          /* the Posix memory-based semaphores */
    sem_t   n_empty;
    sem_t   n_stored;
    int     n_put;                          /* index into msg_off[] for next put */
    long    n_overflow;                     /* #overflows by senders */
    sem_t   n_overflow_mutex;               /* mutex for n_overflow counter */
    long    msg_off[N_MESG];                /* offset in shared memory of each message */
    char    msg_data[N_MESG * MESG_SIZE];   /* the actual messages */
};

int main(int argc, char **argv)
{
    static int shm_id;
    int     fd, i, nloop, nusec;
    pid_t   pid;
    char    mesg[MESG_SIZE];
    long    offset;
    struct shmstruct *ptr = NULL;

    if (argc != 3)
    {
        printf("usage: %s <#loops> <#usec>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    nloop = atoi(argv[1]);
    nusec = atoi(argv[2]);
   
    if ((shm_id = shmget(SHM_SERV_KET, 0, 0666)) < 0)
    {
        perror("shmget()");
        exit(EXIT_FAILURE);
    }
    if ((ptr = shmat(shm_id, NULL, 0)) == NULL)
    {
        printf("shmat(): error\n");
        exit(EXIT_FAILURE);
    }

    /* producer */
    printf("#loops = %d, #usec = %d\n", nloop, nusec);
    pid = getpid();
    for (i = 0; i < nloop; i++)
    {
        usleep(nusec);
        snprintf(mesg, MESG_SIZE, "pid %ld: message %d", (long)pid, i);

        if (sem_trywait(&ptr->n_empty) == -1)
        {
            if (errno == EAGAIN)
            {
                sem_wait(&ptr->n_overflow_mutex);
                ptr->n_overflow++;
                sem_post(&ptr->n_overflow_mutex);
                continue;;
            }
            else
            {
                printf("err_trywait error\n");
                exit(EXIT_FAILURE);
            }
        }

        sem_wait(&ptr->mutex);
        offset = ptr->msg_off[ptr->n_put];
        if (++(ptr->n_put) >= N_MESG)
        {
            ptr->n_put = 0; /* circular buffer */
        }
        sem_post(&ptr->mutex);

        strcpy(&ptr->msg_data[offset], mesg);
        sem_post(&ptr->n_stored);
    }
    shmdt(ptr);

    exit(0);
}

运行测试:

四、参考引用

IPC之八:使用 POSIX 共享内存进行进程间通信的实例
IPC之七:使用 System V 共享内存段进行进程间通信的实例