消息传递:消息队列

发布时间 2023-12-08 16:03:13作者: eiSouthBoy

一、消息队列

在上一章节 消息传递:消息队列 中提到 PIPE 和 FIFO 是基于字节流的,把这种字节流(没有消息边界)分隔成各个记录的任何方法都得由应用程序来实现。例如提到的一个记录的格式为一行,格式:1234 /tmp/fifo.serv
另一方面,PIPE 和 FIFO 有许多规则,制约的 open 的阻塞与否。当一个PIPE或FIFO的最后一次关闭发生时,仍在该 PIPE 或 FIFO 上的数据将被丢弃。

为了减少应用程序的开发复杂度,提出一种新的解决方法:消息队列。消息队列根据不同的标准和差异分为如下两种:

  • Posix消息队列
  • System V消息队列

消息队列本质上可认为是一个消息链表。队列中的每条消息具有如下属性:

  • 一个无符号整数优先级(Posix)或一个长整型类型(System V)
  • 消息的数据部分长度(可以为0)
  • 数据本身(如果长度大于0)

查看消息队列的默认属性和限制:

其中:默认消息数量:10,默认单条消息最大大小:8192,消息队列数量:256

? 特点:在某一个进程往一个队列写入消息之前,并不需要另外某个进程在队列上等待消息的到达。

二、Posix消息队列

Posix消息队列数据结构示意图:

客户-服务程序通信

目的:客户端向公共的服务器队列(/mq.serv)发送消息(1234 hello),服务器从服务器队列(/mq.serv)接收客户端请求,解析请求为 pid 和 文本字符串,服务器打开客户端队列(/mq.1234)并将文本字符串发送。
客户端从客户端队列(/mq.1234)接收服务器的响应信息。

posix_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>

#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>

#include <mqueue.h>

#define SERV_MQ         "/mq.serv"
#define MAX_MSG_SIZE    8192

/* default permission for new files.  */
#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)

mqd_t qd_server, qd_client;

/* signal handler for catching ctrl+c */
void sigint(int signum)
{
    printf("\nserver terminated by user.\n");
    if (qd_server > 0)
    {
        mq_close(qd_server);
    }
    if (qd_client > 0)
    {
        mq_close(qd_client);
    }

    mq_unlink(SERV_MQ); // delete mq
    exit(EXIT_SUCCESS);
}

int main(int argc, char **argv)
{
    int fd = 0;
    struct mq_attr attr;
    char in_buff[MAX_MSG_SIZE] = {0};
    char out_buff[MAX_MSG_SIZE] = {0};
    char *ptr = NULL;
    char mq_client_name[1024] = {0};
    size_t n = 0;
    pid_t pid;
    unsigned int prio = 0;

    /* create message queue */
    if ((qd_server = mq_open(SERV_MQ, O_RDONLY | O_CREAT, FILE_MODE, NULL)) == -1)
    {
        perror("mq_open()");
        exit(EXIT_FAILURE);
    }

    signal(SIGINT, sigint); // catch ctrl_c

    /* receive request from client, and respond to client */
    while (1)
    {
        memset(in_buff, 0, sizeof(in_buff));
        if ((n = mq_receive(qd_server, in_buff, sizeof(in_buff), &prio)) == -1)
        {
            perror("mq_receive()");
            exit(EXIT_FAILURE);
        }
        
        if (in_buff[n - 1] == '\n')
            n--;
        in_buff[n] = '\0';
        printf("server read (%s) queue msg:%s\n", SERV_MQ, in_buff);

        if ((ptr = strchr(in_buff, ' ')) == NULL)
        {
            printf("bogus request: %s\n", in_buff);
            continue;
        }

        *ptr++ = 0; // null terminate PID, ptr = pathname
        pid = atol(in_buff);
        snprintf(mq_client_name, sizeof(mq_client_name), "/mq.%ld", (long)pid);

        /* open respond mq, write-only */
        if ((qd_client = mq_open(mq_client_name, O_WRONLY)) == -1)
        {
            perror("server: can't open client queue");
            continue;
        }
        memset(out_buff, 0, sizeof(out_buff));
        snprintf(out_buff, sizeof(out_buff), "%s\n", ptr);
        if (mq_send(qd_client, out_buff, strlen(out_buff), 0) == -1)
        {
            perror("can't send message to client");
            mq_close(qd_client);
            continue;
        }
        mq_close(qd_client);
    }

    mq_close(qd_server);
    mq_unlink(SERV_MQ); // delete mq
    exit(0);
}

posix_client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>

#include <mqueue.h>

#define MAX_LINE        1024
#define MAX_MSG_SIZE    8192
#define SERV_MQ         "/mq.serv"

#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)

mqd_t qd_server, qd_client;

/* signal handler for catching ctrl+c */
void sigint(int signum)
{
    printf("\nserver terminated by user.\n");
    if (qd_server > 0)
    {
        mq_close(qd_server);
    }
    if (qd_client > 0)
    {
        mq_close(qd_client);
    }

    mq_unlink(SERV_MQ); // delete mq
    exit(EXIT_SUCCESS);
}

int main(int argc, char **argv)
{
    int flags = 0, fd = 0;
    
    struct mq_attr attr;
    char buff[MAX_MSG_SIZE] = {0}, msg[MAX_MSG_SIZE] = {0};
    char *ptr = NULL;
    char suffix[1024] = {0};
    char mq_name[MAX_LINE] = {0};
    size_t len = 0, n = 0;
    pid_t pid;
    unsigned int prio = 0;

    /* create mq with our PID as part of name */
    flags = O_RDONLY | O_CREAT;
    pid = getpid();
    snprintf(mq_name, sizeof(mq_name), "/mq.%ld", (long)pid);
    if ((qd_client = mq_open(mq_name, flags, FILE_MODE, NULL)) == -1)
    {
        perror("mq_open()");
        exit(EXIT_FAILURE);
    }
    snprintf(suffix, sizeof(suffix), "%ld ", (long)pid);
    len = strlen(suffix);
    if ((qd_server = mq_open(SERV_MQ, O_WRONLY)) == -1)
    {
        printf("error: can't open %s\n", SERV_MQ);
        mq_close(qd_client);
        mq_unlink(mq_name);
        exit(EXIT_FAILURE);
    }

    signal(SIGINT, sigint); // catch ctrl_c

    while (1)
    {
        memset(buff, 0, sizeof(buff));
        fgets(buff, sizeof(buff) - len, stdin);
        snprintf(msg, sizeof(msg), "%s%s", suffix, buff);
        
        if (mq_send(qd_server, msg, strlen(msg), 0) == -1)
        {
            perror("mq_send()");
            break;;
        }
        printf("client write (%s) queue msg:%s", SERV_MQ, msg);

        memset(buff, 0, sizeof(buff));
        if (mq_receive(qd_client, buff, sizeof(buff), &prio) == -1)
        {
            perror("mq_receive()");
            break;
        }
        printf("client read (%s) queue msg:%s", mq_name, buff);
    }

    mq_close(qd_server);
    mq_close(qd_client);
    mq_unlink(mq_name);
    
    exit(0);
}

? 编译-链接注意事项:

  • 消息队列的文件名以 / 开始,后紧接名字。例如:/temp.123
  • 链接时,添加参数:-lrt

运行测试:

补充

Posix消息队列提供异步通知方式,通过函数:mq_notify() 实现。

三、System V消息队列

System V消息队列数据结构示意图:

3.1 客户端 -- 服务器例子

使用管道和FIFO时,为在两个方向上交换数据需两个IPC通道,因为这两种类型的IPC是单向的。但是使用消息队列时,单个队列就够用,由每个消息的类型来标识该消息是从客户到服务器,还是从服务器到客户。

例子示意图:

服务器:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>

#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <errno.h>

#include <sys/ipc.h>
#include <sys/msg.h> // system v消息队列头文件

#define MQ_KEY1 1234L
#define MQ_KEY2 2345L

#define MAX_MESG_DATA (PIPE_BUF - 2*sizeof(long))

int msqid;

struct mymesg {
  long	mesg_len;	/* #bytes in mesg_data, can be 0 */
  long	mesg_type;	/* message type, must be > 0 */
  char	mesg_data[MAX_MESG_DATA];
};

/* signal handler for catching ctrl+c */
void sigint(int signum)
{
    printf("\nserver terminated by user.\n");
    if (msqid >= 0)
    {
        msgctl(msqid, IPC_RMID, NULL);
    }

    exit(EXIT_SUCCESS);
}

ssize_t mesg_send(int id, struct mymesg *mprt)
{
    return (msgsnd(id, &(mprt->mesg_type), mprt->mesg_len, 0));
}

ssize_t mesg_recv(int id, struct mymesg *mptr)
{
    ssize_t n = 0;
    
    n = msgrcv(id, &(mptr->mesg_type), MAX_MESG_DATA, mptr->mesg_type, 0);
    mptr->mesg_len = n;
    return n;
}

void server(int readfd, int writefd)
{
    FILE *fp = NULL;
    char *ptr = NULL;
    pid_t pid = 0;
    ssize_t n = 0;
    struct mymesg mesg;

    while (1)
    {
        mesg.mesg_type = 1;
        if ((n = mesg_recv(readfd, &mesg)) == 0)
        {
            printf("pathname missing\n");
            continue;
        }
        mesg.mesg_data[n] = '\0'; /* null terminate pathname */

        printf("server recv msg:%s\n", mesg.mesg_data);
        if ((ptr = strchr(mesg.mesg_data, ' ')) == NULL)
        {
            printf("bogus request: %s\n", mesg.mesg_data);
            continue;
        }

        *ptr++ = 0;
        pid = atol(mesg.mesg_data);
        mesg.mesg_type = pid;

        if ((fp = fopen(ptr, "r")) == NULL)
        {
            snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
                     ": can't open, %s\n", strerror(errno));
            mesg.mesg_len = strlen(ptr);
            memmove(mesg.mesg_data, ptr, mesg.mesg_len);
            mesg_send(writefd, &mesg);
        }
        else
        {
            while (fgets(mesg.mesg_data, MAX_MESG_DATA, fp) != NULL)
            {
                mesg.mesg_len = strlen(mesg.mesg_data);
                mesg_send(writefd, &mesg);
            }
            fclose(fp);
        }

        /* send a 0-length message to signify the end */
        mesg.mesg_len = 0;
        mesg_send(writefd, &mesg);
    }
}

int main(int argc, char **argv)
{
    if ((msqid = msgget(MQ_KEY1, 0666 | IPC_CREAT)) == -1)
    {
        perror("msgget()");
        exit(EXIT_FAILURE);
    }
    
    server(msqid, msqid);

    msgctl(msqid, IPC_RMID, NULL);
    exit(0);
}

客户端

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>

#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <errno.h>

#include <sys/ipc.h>
#include <sys/msg.h> // system v消息队列头文件

#define MQ_KEY1 1234L
#define MQ_KEY2 2345L

#define MAX_MESG_DATA (PIPE_BUF - 2*sizeof(long))

struct mymesg {
  long	mesg_len;	/* #bytes in mesg_data, can be 0 */
  long	mesg_type;	/* message type, must be > 0 */
  char	mesg_data[MAX_MESG_DATA];
};

ssize_t mesg_send(int id, struct mymesg *mprt)
{
    return (msgsnd(id, &(mprt->mesg_type), mprt->mesg_len, 0));
}

ssize_t mesg_recv(int id, struct mymesg *mptr)
{
    ssize_t n = 0;
    
    n = msgrcv(id, &(mptr->mesg_type), MAX_MESG_DATA, mptr->mesg_type, 0);
    mptr->mesg_len = n;
    return n;
}

void client(int readfd, int writefd)
{
    char *ptr = NULL;
    ssize_t n = 0;
    size_t len = 0;
    struct mymesg mesg;

    snprintf(mesg.mesg_data, MAX_MESG_DATA, "%ld ", (long)getpid());
    len = strlen(mesg.mesg_data);
    ptr = mesg.mesg_data + len;

    fgets(ptr, MAX_MESG_DATA - len, stdin);
    len = strlen(mesg.mesg_data);
    printf("client send msg:%s", mesg.mesg_data);
    if (mesg.mesg_data[len - 1] == '\n')
        len--;
    mesg.mesg_len = len;
    mesg.mesg_type = 1;

    if (mesg_send(writefd, &mesg) == -1)
    {
        perror("mesg_send()");
        return;
    }
    mesg.mesg_type = getpid();
    while ((n = mesg_recv(readfd, &mesg)) > 0)
    {
        write(STDOUT_FILENO, mesg.mesg_data, n);
    }
}

int main(int argc, char **argv)
{
    int msqid;
   
    if ((msqid = msgget(MQ_KEY1, 0666)) == -1)
    {
        perror("msgget()");
        exit(EXIT_FAILURE);
    }

    client(msqid, msqid);

    exit(0);
}

运行测试:

? 注意事项:

可以通过命令:ipcs -q 查看消息队列的信息,命令:ipcrm -q msqid 删除指定消息队列

四、参考引用

IPC之四:使用 POSIX 消息队列进行进程间通信的实例
IPC之三:使用 System V 消息队列进行进程间通信的实例