Linux基于syscall的消息队列说明和使用

发布时间 2023-05-25 19:39:47作者: 林西索

在Linux下可以使用消息队列来实现进程间的通信

例子:

发送端:send.c

#include <iostream>
#include <thread>

#include <unistd.h>
#include <sys/msg.h>
#include <linux/mqueue.h>
//#include <mqueue.h>
#include <fcntl.h>
#include <syscall.h>
#include <sys/time.h>
#include <errno.h>
#include <string.h>

#define MY_QUE "/MuxRouter_VehicleQueue"

#define MR_MQ_MSG_NUM_MAX 30
#define MR_MQ_MSG_LEN_MAX (2176) /**< the queue size can contain whole mux frame data */

int MsgQueOpen(const char *name, int oflag, mode_t mode, struct mq_attr attr)
{    
    return syscall(SYS_mq_open, name + 1, oflag, mode, &attr);
}

//msgPrio 优先级
int MsgQueSend(int fd, uint8_t *data, size_t dataLen, uint32_t msgPrio, const struct timespec *abs_timeout)
{
    return syscall(SYS_mq_timedsend, fd, data, dataLen, msgPrio, NULL);
}

int MsgQueReceive(int fd, uint8_t *data, size_t dataLen, uint32_t msgPrio, const struct timespec *abs_timeout)
{
    return syscall(SYS_mq_timedreceive, fd, data, dataLen, msgPrio, NULL);
}

//关闭连接,但队列还存在
int MsgQueClose(int fd)
{
    return syscall(SYS_close, fd);
}

//从系统移除此队列
int MsgQueUnlink(const char* queName)
{
    return syscall(SYS_mq_unlink, queName);//好像是无效的    
}

int main(int argc, char **argv)
{
  if (MsgQueUnlink(MY_QUE) < 0)
    {
        printf("error: syscall unlink %s failed. %s\n", MY_QUE, strerror(errno));
        //return;
    }

    struct mq_attr attr;
    attr.mq_maxmsg = MR_MQ_MSG_NUM_MAX;
    attr.mq_msgsize = MR_MQ_MSG_LEN_MAX;

    int fd = MsgQueOpen(MY_QUE, O_CREAT | O_WRONLY, 0666, attr);
    if (fd < 0)
    {
        printf("error: syscall create %s failed. %s\n", MY_QUE, strerror(errno));
        return;
    }

    printf("create server success\n");

    struct timeval curTime = {0, 0};
    struct timespec waitTime = {0, 0};
    time_t failedTime = 0;

    char data[128] = {0};
    size_t dataLen = strlen(data);
    int i = 0;
    while (true)
    {
        if (gettimeofday(&curTime, NULL) < 0)
        {
            curTime.tv_sec = time(NULL);
        }

        waitTime.tv_sec = curTime.tv_sec + 1;
        waitTime.tv_nsec = curTime.tv_usec * 1000;

        sprintf(data, "%d", i++);
        dataLen = strlen(data);
        if (MsgQueSend(fd, (uint8_t*)data, dataLen, 0, &waitTime) < 0)//不建议设置超时
        {
            printf("send failed, %s\n", strerror(errno));
            //return;
        }
        printf("send:%s\n", data);
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
     if (i == 100)
     {
       break;
     }
    }
   
    MsgQueClose(fd);

    return 0;
}
 

接收端:receive.c



#include <iostream>
#include <thread>

#include <unistd.h>
#include <sys/msg.h>
#include <linux/mqueue.h>
//#include <mqueue.h>
#include <fcntl.h>
#include <syscall.h>
#include <sys/time.h>
#include <errno.h>
#include <string.h>

#define MY_QUE "/MuxRouter_VehicleQueue"

#define MR_MQ_MSG_NUM_MAX 30
#define MR_MQ_MSG_LEN_MAX (2176) /**< the queue size can contain whole mux frame data */

int MsgQueOpen(const char *name, int oflag, mode_t mode, struct mq_attr attr)
{    
    return syscall(SYS_mq_open, name + 1, oflag, mode, &attr);
}

//msgPrio 优先级
int MsgQueSend(int fd, uint8_t *data, size_t dataLen, uint32_t msgPrio, const struct timespec *abs_timeout)
{
    return syscall(SYS_mq_timedsend, fd, data, dataLen, msgPrio, NULL);
}

int MsgQueReceive(int fd, uint8_t *data, size_t dataLen, uint32_t msgPrio, const struct timespec *abs_timeout)
{
    return syscall(SYS_mq_timedreceive, fd, data, dataLen, msgPrio, NULL);
}

//关闭连接,但队列还存在
int MsgQueClose(int fd)
{
    return syscall(SYS_close, fd);
}

//从系统移除此队列
int MsgQueUnlink(const char* queName)
{
    return syscall(SYS_mq_unlink, queName);//好像是无效的    
}

int main(int argc, char **argv)
{
    struct mq_attr attr;
    attr.mq_maxmsg = MR_MQ_MSG_NUM_MAX;
    attr.mq_msgsize = MR_MQ_MSG_LEN_MAX;

    int fd = MsgQueOpen(MY_QUE, O_CREAT | O_RDONLY, 0666, attr);
    if (fd < 0)
    {
        printf("error: syscall create %s failed. %s\n", MY_QUE, strerror(errno));
        return;
    }

    printf("create client success\n");

    struct timeval curTime = {0, 0};
    struct timespec waitTime = {0, 0};
    time_t failedTime = 0;

    uint8_t data[4096] = {0};
    size_t dataLen = sizeof(data);
    printf("%d\n", dataLen);
    while (true)
    {
        if (gettimeofday(&curTime, NULL) < 0)
        {
            curTime.tv_sec = time(NULL);
        }

        memset(data, 0, 4096);

        waitTime.tv_sec = curTime.tv_sec + 1;
        waitTime.tv_nsec = curTime.tv_usec * 1000;
        if (MsgQueReceive(fd, data, dataLen, 0, NULL) < 0)//这里的超时不建议设置
        {
            printf("receive failed, %s\n", strerror(errno));
            //return;
        }
        else
        {
            printf("%s\n", data);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }

    MsgQueClose(fd);

    return 0;
}

 

注意点:

1.创建mq失败时,检查传入的参数是否正确。

2.mq的命名规则必须为“/”开头,后面跟不含"/"的名字,如/ipc_name。

3.调用syscall的接口失败时,都可以使用 strerror(errno) 获取错误信息,根据错误信息检查问题。

4.使用 MsgQueSend 发送数据失败时,很有可能是创建的mq有问题,需要调用 MsgQueUnlink 把队列删除,重新创建。(使用syscall好像不能删除,可以使用#include<mqueue.h>头文件下的mq_unlink接口进行删除)。

5.使用 MsgQueReceive 接收消息时,data的长度必须>=创建mq时设置的 mq_msgsize。

 

参考:

mq_open()接口的消息队列:https://blog.csdn.net/qq_36973838/article/details/124807479