一、消息队列
在上一章节 消息传递:消息队列 中提到 PIPE 和 FIFO
是基于字节流的,把这种字节流(没有消息边界)分隔成各个记录的任何方法都得由应用程序来实现。例如提到的一个记录的格式为一行,格式:1234 /tmp/fifo.serv
。
另一方面,PIPE 和 FIFO 有许多规则,制约的 open 的阻塞与否。当一个PIPE或FIFO的最后一次关闭发生时,仍在该 PIPE 或 FIFO 上的数据将被丢弃。
为了减少应用程序的开发复杂度,提出一种新的解决方法:消息队列
。消息队列根据不同的标准和差异分为如下两种:
- Posix消息队列
- System V消息队列
消息队列本质上可认为是一个消息链表。队列中的每条消息具有如下属性:
- 一个无符号整数优先级(Posix)或一个长整型类型(System V)
- 消息的数据部分长度(可以为0)
- 数据本身(如果长度大于0)
查看消息队列的默认属性和限制:
其中:默认消息数量:10,默认单条消息最大大小:8192,消息队列数量:256
? 特点:在某一个进程往一个队列写入消息之前,并不需要另外某个进程在队列上等待消息的到达。
二、Posix消息队列
Posix消息队列数据结构示意图:
客户-服务程序通信
目的:客户端向公共的服务器队列(/mq.serv
)发送消息(1234 hello
),服务器从服务器队列(/mq.serv
)接收客户端请求,解析请求为 pid 和 文本字符串,服务器打开客户端队列(/mq.1234
)并将文本字符串发送。
客户端从客户端队列(/mq.1234
)接收服务器的响应信息。
posix_server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <mqueue.h>
#define SERV_MQ "/mq.serv"
#define MAX_MSG_SIZE 8192
/* default permission for new files. */
#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
mqd_t qd_server, qd_client;
/* signal handler for catching ctrl+c */
void sigint(int signum)
{
printf("\nserver terminated by user.\n");
if (qd_server > 0)
{
mq_close(qd_server);
}
if (qd_client > 0)
{
mq_close(qd_client);
}
mq_unlink(SERV_MQ); // delete mq
exit(EXIT_SUCCESS);
}
int main(int argc, char **argv)
{
int fd = 0;
struct mq_attr attr;
char in_buff[MAX_MSG_SIZE] = {0};
char out_buff[MAX_MSG_SIZE] = {0};
char *ptr = NULL;
char mq_client_name[1024] = {0};
size_t n = 0;
pid_t pid;
unsigned int prio = 0;
/* create message queue */
if ((qd_server = mq_open(SERV_MQ, O_RDONLY | O_CREAT, FILE_MODE, NULL)) == -1)
{
perror("mq_open()");
exit(EXIT_FAILURE);
}
signal(SIGINT, sigint); // catch ctrl_c
/* receive request from client, and respond to client */
while (1)
{
memset(in_buff, 0, sizeof(in_buff));
if ((n = mq_receive(qd_server, in_buff, sizeof(in_buff), &prio)) == -1)
{
perror("mq_receive()");
exit(EXIT_FAILURE);
}
if (in_buff[n - 1] == '\n')
n--;
in_buff[n] = '\0';
printf("server read (%s) queue msg:%s\n", SERV_MQ, in_buff);
if ((ptr = strchr(in_buff, ' ')) == NULL)
{
printf("bogus request: %s\n", in_buff);
continue;
}
*ptr++ = 0; // null terminate PID, ptr = pathname
pid = atol(in_buff);
snprintf(mq_client_name, sizeof(mq_client_name), "/mq.%ld", (long)pid);
/* open respond mq, write-only */
if ((qd_client = mq_open(mq_client_name, O_WRONLY)) == -1)
{
perror("server: can't open client queue");
continue;
}
memset(out_buff, 0, sizeof(out_buff));
snprintf(out_buff, sizeof(out_buff), "%s\n", ptr);
if (mq_send(qd_client, out_buff, strlen(out_buff), 0) == -1)
{
perror("can't send message to client");
mq_close(qd_client);
continue;
}
mq_close(qd_client);
}
mq_close(qd_server);
mq_unlink(SERV_MQ); // delete mq
exit(0);
}
posix_client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <mqueue.h>
#define MAX_LINE 1024
#define MAX_MSG_SIZE 8192
#define SERV_MQ "/mq.serv"
#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
mqd_t qd_server, qd_client;
/* signal handler for catching ctrl+c */
void sigint(int signum)
{
printf("\nserver terminated by user.\n");
if (qd_server > 0)
{
mq_close(qd_server);
}
if (qd_client > 0)
{
mq_close(qd_client);
}
mq_unlink(SERV_MQ); // delete mq
exit(EXIT_SUCCESS);
}
int main(int argc, char **argv)
{
int flags = 0, fd = 0;
struct mq_attr attr;
char buff[MAX_MSG_SIZE] = {0}, msg[MAX_MSG_SIZE] = {0};
char *ptr = NULL;
char suffix[1024] = {0};
char mq_name[MAX_LINE] = {0};
size_t len = 0, n = 0;
pid_t pid;
unsigned int prio = 0;
/* create mq with our PID as part of name */
flags = O_RDONLY | O_CREAT;
pid = getpid();
snprintf(mq_name, sizeof(mq_name), "/mq.%ld", (long)pid);
if ((qd_client = mq_open(mq_name, flags, FILE_MODE, NULL)) == -1)
{
perror("mq_open()");
exit(EXIT_FAILURE);
}
snprintf(suffix, sizeof(suffix), "%ld ", (long)pid);
len = strlen(suffix);
if ((qd_server = mq_open(SERV_MQ, O_WRONLY)) == -1)
{
printf("error: can't open %s\n", SERV_MQ);
mq_close(qd_client);
mq_unlink(mq_name);
exit(EXIT_FAILURE);
}
signal(SIGINT, sigint); // catch ctrl_c
while (1)
{
memset(buff, 0, sizeof(buff));
fgets(buff, sizeof(buff) - len, stdin);
snprintf(msg, sizeof(msg), "%s%s", suffix, buff);
if (mq_send(qd_server, msg, strlen(msg), 0) == -1)
{
perror("mq_send()");
break;;
}
printf("client write (%s) queue msg:%s", SERV_MQ, msg);
memset(buff, 0, sizeof(buff));
if (mq_receive(qd_client, buff, sizeof(buff), &prio) == -1)
{
perror("mq_receive()");
break;
}
printf("client read (%s) queue msg:%s", mq_name, buff);
}
mq_close(qd_server);
mq_close(qd_client);
mq_unlink(mq_name);
exit(0);
}
? 编译-链接注意事项:
- 消息队列的文件名以
/
开始,后紧接名字。例如:/temp.123
- 链接时,添加参数:
-lrt
运行测试:
补充
Posix消息队列提供异步通知方式,通过函数:mq_notify()
实现。
三、System V消息队列
System V消息队列数据结构示意图:
3.1 客户端 -- 服务器例子
使用管道和FIFO时,为在两个方向上交换数据需两个IPC通道,因为这两种类型的IPC是单向的。但是使用消息队列时,单个队列就够用,由每个消息的类型来标识该消息是从客户到服务器,还是从服务器到客户。
例子示意图:
服务器:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/msg.h> // system v消息队列头文件
#define MQ_KEY1 1234L
#define MQ_KEY2 2345L
#define MAX_MESG_DATA (PIPE_BUF - 2*sizeof(long))
int msqid;
struct mymesg {
long mesg_len; /* #bytes in mesg_data, can be 0 */
long mesg_type; /* message type, must be > 0 */
char mesg_data[MAX_MESG_DATA];
};
/* signal handler for catching ctrl+c */
void sigint(int signum)
{
printf("\nserver terminated by user.\n");
if (msqid >= 0)
{
msgctl(msqid, IPC_RMID, NULL);
}
exit(EXIT_SUCCESS);
}
ssize_t mesg_send(int id, struct mymesg *mprt)
{
return (msgsnd(id, &(mprt->mesg_type), mprt->mesg_len, 0));
}
ssize_t mesg_recv(int id, struct mymesg *mptr)
{
ssize_t n = 0;
n = msgrcv(id, &(mptr->mesg_type), MAX_MESG_DATA, mptr->mesg_type, 0);
mptr->mesg_len = n;
return n;
}
void server(int readfd, int writefd)
{
FILE *fp = NULL;
char *ptr = NULL;
pid_t pid = 0;
ssize_t n = 0;
struct mymesg mesg;
while (1)
{
mesg.mesg_type = 1;
if ((n = mesg_recv(readfd, &mesg)) == 0)
{
printf("pathname missing\n");
continue;
}
mesg.mesg_data[n] = '\0'; /* null terminate pathname */
printf("server recv msg:%s\n", mesg.mesg_data);
if ((ptr = strchr(mesg.mesg_data, ' ')) == NULL)
{
printf("bogus request: %s\n", mesg.mesg_data);
continue;
}
*ptr++ = 0;
pid = atol(mesg.mesg_data);
mesg.mesg_type = pid;
if ((fp = fopen(ptr, "r")) == NULL)
{
snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
": can't open, %s\n", strerror(errno));
mesg.mesg_len = strlen(ptr);
memmove(mesg.mesg_data, ptr, mesg.mesg_len);
mesg_send(writefd, &mesg);
}
else
{
while (fgets(mesg.mesg_data, MAX_MESG_DATA, fp) != NULL)
{
mesg.mesg_len = strlen(mesg.mesg_data);
mesg_send(writefd, &mesg);
}
fclose(fp);
}
/* send a 0-length message to signify the end */
mesg.mesg_len = 0;
mesg_send(writefd, &mesg);
}
}
int main(int argc, char **argv)
{
if ((msqid = msgget(MQ_KEY1, 0666 | IPC_CREAT)) == -1)
{
perror("msgget()");
exit(EXIT_FAILURE);
}
server(msqid, msqid);
msgctl(msqid, IPC_RMID, NULL);
exit(0);
}
客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/msg.h> // system v消息队列头文件
#define MQ_KEY1 1234L
#define MQ_KEY2 2345L
#define MAX_MESG_DATA (PIPE_BUF - 2*sizeof(long))
struct mymesg {
long mesg_len; /* #bytes in mesg_data, can be 0 */
long mesg_type; /* message type, must be > 0 */
char mesg_data[MAX_MESG_DATA];
};
ssize_t mesg_send(int id, struct mymesg *mprt)
{
return (msgsnd(id, &(mprt->mesg_type), mprt->mesg_len, 0));
}
ssize_t mesg_recv(int id, struct mymesg *mptr)
{
ssize_t n = 0;
n = msgrcv(id, &(mptr->mesg_type), MAX_MESG_DATA, mptr->mesg_type, 0);
mptr->mesg_len = n;
return n;
}
void client(int readfd, int writefd)
{
char *ptr = NULL;
ssize_t n = 0;
size_t len = 0;
struct mymesg mesg;
snprintf(mesg.mesg_data, MAX_MESG_DATA, "%ld ", (long)getpid());
len = strlen(mesg.mesg_data);
ptr = mesg.mesg_data + len;
fgets(ptr, MAX_MESG_DATA - len, stdin);
len = strlen(mesg.mesg_data);
printf("client send msg:%s", mesg.mesg_data);
if (mesg.mesg_data[len - 1] == '\n')
len--;
mesg.mesg_len = len;
mesg.mesg_type = 1;
if (mesg_send(writefd, &mesg) == -1)
{
perror("mesg_send()");
return;
}
mesg.mesg_type = getpid();
while ((n = mesg_recv(readfd, &mesg)) > 0)
{
write(STDOUT_FILENO, mesg.mesg_data, n);
}
}
int main(int argc, char **argv)
{
int msqid;
if ((msqid = msgget(MQ_KEY1, 0666)) == -1)
{
perror("msgget()");
exit(EXIT_FAILURE);
}
client(msqid, msqid);
exit(0);
}
运行测试:
? 注意事项:
可以通过命令:ipcs -q
查看消息队列的信息,命令:ipcrm -q msqid
删除指定消息队列
四、参考引用
IPC之四:使用 POSIX 消息队列进行进程间通信的实例
IPC之三:使用 System V 消息队列进行进程间通信的实例