linux Libevent 轻量级事件通知库API简介和示例

发布时间 2023-11-15 17:42:50作者: imXuan

1. 事件处理框架

  libevent 是一个C语言编写,轻量级开源高性能事件框架。事件驱动,支持多种IO多路复用(如epoll),支持注册优先级等

// 头文件
#include <event2/event.h>
// 创建一个事件处理框架
struct event_base *event_base_new(void);
// 销毁一个事件处理框架
void event_base_free(struct event_base *base);

// 查看底层支持的IO转接模型
const char** event_get_supported_method(void);
// 查看当前实际处理框架使用的IO转接模型
const char* event_base_get_method(const struct event_base *base);
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <event2/event.h>

int main()
{
    // 1.创建事件处理框架
    struct event_base* base = event_base_new();

    // 2.查看支持什么IO转接函数
    const char** methods = event_get_supported_methods();
    printf("支持的IO转接模:\n");
    for(int i=0; methods[i]!=NULL; ++i)
    {
        printf("          %s\n", methods[i]);
    }
    // 3.查看当前使用的IO转接函数
    printf("当前使用的IO转接模型: %s\n", event_base_get_method(base));
    // 4.释放资源
    event_base_free(base);
    return 0;
}
代码示例:创建事件处理框架

2. 事件循环

  •  开始事件循环
// 头文件
#include <event2/event.h>

// 启动事件处理框架中的事件循环
int event_base_dispatch(struct event_base* base);
// 举例:检测读事件,检测是否有数据发送,等待到事件触发
//         如果有读事件,默认情况下只处理一次,事件循环停止,只能接收一次数据
//         如果要持续接收数据,需要额外设置
// 这个函数被调用,内部会进行事件检测,阻塞在这一行
  • 终止事件循环
// 头文件
#include <event2/event.h>

// 表示一个时间段
struct timeval {
    long tv_sec;    //
    long tv_usec;    // 微秒
};

// 事件循环不会马上终止,在tv指定时长后终止
int event_base_loopexit(struct event_base *base, const struct timeval *tv);

// 立即终止事件循环
int event_base_loopbreak(struct event_base *base);

3. 事件

  • 设置事件处理函数
  • 创建需要检测的文件描述符对应的事件
// 头文件
#include <event2/event.h>

#define EV_TIMEOUT 0x01  // 定时器超时
#define EV_READ    0x02  // 读,检测缓冲区是否有数据
#define EV_WRITE   0x04  // 写,检测是否可写
#define EV_SIGNAL  0x08  // 信号
#define EV_PERSIST 0x10  // 设置事件被重复检测
#define EV_ET      0x20  // 边沿模式

// 事件的处理函数,被libevent框架调用
typedef void(*event_callback_fn)(evutil_socket_t fd, short what, void *arg);
/*  参数
        fd: event_new() 的第二个参数
        what: 记录了文件描述符触发的事件
        arg: event_new() 的最后一个参数        */

// 创建一个需要检测的事件,struct event 就是创建出的事件
// evutil_socket_t == int
// event_new() 函数本质就是对一个文件描述符进行封装
struct event* event_new(struct event_base *base, evutil_socket_t fd, short what, event_callback_fn cb, void *arg);
/*  参数
        base: 事件处理框架
        fd: 文件描述符(如管道、套接字通信)
        what: 要检测的描述符
            EV_READ: 读事件
            EV_WRITE: 写事件
            EV_SIGNAL: 信号事件(Linux)
            EV_ET: 设置边沿模式
        cb: 函数指针对应的一个回调函数,检测触发时,这个函数被调用
        arg: 实参传给cb                     */

// 释放事件资源
void event_free(struct event *event);
  • 事件的添加和删除
struct timeval {
    long tv_sec;      //
    long tv_usec;     // 微秒
};

// 添加事件到事件循环中(如果在tv时间段没有被触发,会强制调用一次)
int event_add(struct *ev, const struct timeval *tv);
// 将事件从事件处理函数删除
int event_del(struct event *ev);
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <event2/event.h>

// 回调函数
void write_cb(evutil_socket_t fd, short what, void *arg)
{
    // 事件触发的事件在what里
    printf("当前触发的事件是EV_WRITE事件吗: %d\n", what && EV_WRITE);
    // 处理动作就是写管道
    char buf[1024];
    static int num = 0;
    sprintf(buf, "我在写数据,%d\n", num++);
    int ret = write(fd, buf, strlen(buf)+1);
}
int main()
{
    // 1.创建有名管道
    int ret = mkfifo("testfifo", 0664);
    // 2.打开管道的写端
    int wfd = open("testfifo", O_WRONLY);
    // 3.创建事件处理框架
    struct event_base *base = event_base_new();
    // 4.创建事件并添加到时间处理框架
    struct event *ev = event_new(base, wfd, EV_WRITE|EV_PERSIST, write_cb, NULL);   // EV_PERSIST 才是持续检测,否则只检测一次
    event_add(ev, NULL);
    // 5.启动事件循环
    event_base_dispatch(base);
    // 释放资源
    event_free(ev);
    event_base_free(base);
}
代码示例:管道写端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <event2/event.h>

// 回调函数
void read_cb(evutil_socket_t fd, short what, void *arg)
{
    // 事件触发的事件在what里
    printf("当前触发的事件是EV_READ事件吗: %d\n", what && EV_READ);
    // 处理动作就是写管道
    char buf[1024];
    memset(buf, 0, sizeof(buf));
    int len = read(fd, buf, sizeof(buf));
    if(len==0)
    {
        printf("写端已经关闭\n");
    }
    else if(len>0)
    {
        printf("接收到的数据: %s", buf);
    }
    else
    {
        printf("失败");
    }
    
}
int main()
{
    // 1.打开管道的写端
    int rfd = open("testfifo", O_RDONLY);
    // 2.创建事件处理框架
    struct event_base *base = event_base_new();
    // 3.创建事件并添加到时间处理框架
    struct event *ev = event_new(base, rfd, EV_READ|EV_PERSIST, read_cb, NULL);  // EV_PERSIST 才是持续检测,否则只检测一次
    event_add(ev, NULL);
    // 4.启动事件循环
    event_base_dispatch(base);
    // 释放资源
    event_free(ev);
    event_base_free(base);
}
代码示例:管道读端

4. 带缓冲区的事件

使用场景:应用于网络套接字通信(bufferevent)

带缓冲区的事件结构:

  1. 封装了 socket 中的文件描述符,得到了struct bufferevent
  2. 该结构体对文件描述符封装后,使用者不能直接操作 socket 对应的两块读写缓冲区,而是先写入 bufferevent 的读写缓冲区,然后再由该框架写入 socket 中的读写缓冲区
  3. 读写缓冲区分别对应了两个读写事件
    • 读回调是 bufferevent 中读缓冲区的有数据时调用的
    • 写回调是数据写入到 socket 的写缓冲区后调用的
  • 创建带缓冲区的事件
struct bufferevent* bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);
/*  参数:
        base: 事件处理框架
        fd: 通信的文件描述符
            自己使用 socket() 函数创建的文件描述符
            填 -1 自动创建
        options: BEV_OPT_CLOSE_ON_FREE: 在销毁对象时自动释放封装的文件描述符      */
  • 销毁带缓冲区的事件
void bufferevent_free(struct bufferevent *bev);
  • 在 bufferevent 上启动连接
int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *address, int addrlen);
/*  参数:
        bev: 带缓冲区的事件,通过 bufferevent_socket_new() 得到
        address: 在这里初始化服务器端的ip和端口信息
        addlen: address 参数执行的内存大小       */
  • 数据发送和接收(发送到 bufferevent 的写缓冲区,从 bufferevent 的读缓冲区接收)
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
/*  参数:
        bufev: bufferevent_socket_new() 得到的返回值
        data: 要发送的数据
        size: 发送数据data的长度
    返回值:
        >0: 发送的字节数     -1: 失败               */
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
/*  参数:
        bufev: bufferevent_socket_new() 得到的返回值
        data: 存储数据的内存
        size: data的大小
    返回值:
        >0: 接收的字节数     -1: 失败               */
  • 设置读写回调函数(读是bufferevent缓冲区,写是socket缓冲区)
void bufferevent_setcb(struct bufferevent *bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg);
/*  参数:
        bufev: bufferevent_socket_new() 得到的返回值
        readcb: 设置读缓冲区对应的回调函数
        writecb: 设置写缓冲区对应的回调函数
        eventcb: 事件回调函数
        cbarg: 回调函数的实参                 */

// 读写缓冲区回调函数原型
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
/* bev: bufferevent_setcb() 第一个参数
   ctx: bufferevent_setcb() 最后一个参数     */

// 读写缓冲区回调函数原型
typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);
/*  bev: bufferevent_setcb() 第一个参数
    event: 实际检测到的事件
        EV_EVENT_READING: 产生读异常
        BEV_EVENT_WRITING: 写异常
        BEV_EVENT_ERROR: 操作时发生错误
        BEV_EVENT_TIMEOUT: 超时
        BEV_EVENT_EOF: 另一端关闭连接
        BEV_EVENT_CONNECTED: 客户端判断是否连接完成
    ctx: bufferevent_socket_new() 最后一个参数         */
  •  禁用、启用缓冲区
// bufferevent 读缓冲区默认不可用,读事件不会被检测,读回调也不会被调用
// bufferevent 写缓冲区默认可用
void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);
/*  event: 读EV_READ 2;   写EV_WRITE 4;  */

// 获取当前缓冲区状态
short bufferevent_get_enabled(struct bufferevent *bufev);
/*  返回值2读可用,返回值4写可用,返回值6都可用  */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
void read_cb(struct bufferevent *, void *);
void write_cb(struct bufferevent *, void *);
void events_cb(struct bufferevent *, short , void *);
void send_cb(evutil_socket_t , short , void* );
int main()
{
    // 1.创建事件处理框架
    struct event_base* base = event_base_new();
    // 2.创建带缓冲区的事件
    struct bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
    // 3.连接服务器
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8989);
    inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
    bufferevent_socket_connect(bev, (struct sockaddr*)&addr, sizeof(addr));
    // 4.设置回调
    bufferevent_setcb(bev, read_cb, write_cb, events_cb, base);
    // 5.设置读缓冲区为可用
    bufferevent_enable(bev, EV_READ);
    struct event *ev = event_new(base, STDIN_FILENO, EV_READ|EV_PERSIST, send_cb, bev);
    event_add(ev, NULL);
    // 6.启动事件循环
    event_base_dispatch(base);
    // 7.释放资源
    bufferevent_free(bev);
    event_base_free(base);
}

void send_cb(evutil_socket_t fd, short what, void* arg)
{
    char buf[1024] = {0};
    read(fd, buf, sizeof(buf));

    struct bufferevent *bev = (struct bufferevent*)arg;
    bufferevent_write(bev, buf, strlen(buf)+1);
    printf("客户端发送数据完毕\n");
}

// 读回调 - 读缓冲区有数据回调 - 接收数据
void read_cb(struct bufferevent *bev, void *ctx)
{
    char buf[1024];
    memset(buf, 0, sizeof(buf));
    bufferevent_read(bev, buf, sizeof(buf));
    printf("接收到服务器回复的数据 : %s\n", buf);
}
// 写回调 - 写入内核缓冲区时回调 - 发送数据
void write_cb(struct bufferevent *bev, void *ctx)
{
    printf("数据被写入内核\n");
}

void events_cb(struct bufferevent *bev, short events, void *ctx)
{
    struct event_base *base = (struct event_base*)ctx;
    if(events & BEV_EVENT_EOF)
    {
        printf("服务器断开了链接\n");
    }
    else if(events & BEV_EVENT_ERROR)
    {
        printf("不可预期错误\n");
    }
    else if(events & BEV_EVENT_CONNECTED)
    {
        printf("和服务器成功建立链接\n");
        return;
    }
    // 终止事件循环
    event_base_loopbreak(base);
}
事件缓冲函数客户端示例代码

5. 链接监听器

  套接字通信服务器需要做以下流程

  创建套接字 -> 绑定ip端口 -> 监听 -> 等待客户端连接 -> 使用对应客户端的通信文件描述符通信 -> 断开连接

  链接监听器可以完成服务器前4步操作

#include <event2/listener.h>
// 回调函数原型,主要是处理通信流程
typedef void (*evconnlistener_cb)(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int len, void *ptr);
/*  listener: evconnlistener_new_bind() 返回值
    sock: 连接建立后得到的通信文件描述符
    addr: 客户端地址信息
    len: addr指向内存的大小
    ptr: evconnlistener_new_bind() 传进来的参数            */

// 创建连接监听器,封装监听的文件描述符
struct evconnlistener* evconnlistener_new(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, evutil_socket_t fd);
/*  参数:
        base: 事件的处理框架
        cb: 回调函数
        ptr: 实参传递给cb
        flags: 连接监听器的选项设置
            LEV_OPT_CLOSE_ON_FREE: 连接监听器释放时关闭封装的监听套接字
            LEV_OPT_REUSEABLE: 端口复用
        backlog: listen() 函数的第二个参数(该函数可以设置监听,最大值128,-1表示自动设置)
        fd: 监听的套接字(绑定ip端口后的)           */

// 创建连接监听器,封装监听的文件描述符
struct evconnlistener* evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, const struct sockaddr *sa, int socklen);
/*  参数:
        base: 事件的处理框架
        cb: 回调函数
        ptr: 实参传递给cb
        flags: 连接监听器的选项设置
            LEV_OPT_CLOSE_ON_FREE: 连接监听器释放时关闭封装的监听套接字
            LEV_OPT_REUSEABLE: 端口复用
        backlog: listen() 函数的第二个参数(该函数可以设置监听,最大值128,-1表示自动设置)
        sa: 绑定时需要的ip和端口
        socklen: sa指针指向的内存和大小          */
  • 调整 evconnlistener 回调函数
void evconnlistener_set_cb(struct evconnlistener *lev, evconnlistener_cb cb, void *arg);
  • 释放
evconnlistener_free(struct evconnlistener* lev)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
void cb_listener(struct evconnlistener *, evutil_socket_t , struct sockaddr *, int , void *);
void read_cb(struct bufferevent *, void *);
void write_cb(struct bufferevent *, void *);
void events_cb(struct bufferevent *, short , void *);

int main()
{
    // 1.创建事件处理框架
    struct event_base* base = event_base_new();
    // 2.创建带缓冲区的事件
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8989);
    addr.sin_addr.s_addr = INADDR_ANY;
    struct evconnlistener* evl;
    evl = evconnlistener_new_bind(base, cb_listener, base, LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, (struct sockaddr*)&addr, sizeof(addr));
    // 3.启动事件循环
    event_base_dispatch(base);
    // 4.释放资源
    evconnlistener_free(evl);
    event_base_free(base);
}

// 链接监听器的回调
void cb_listener(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int len, void *ptr)
{
    struct event_base *base = (struct event_base*)ptr;
    struct bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, read_cb, write_cb, events_cb, base);
    bufferevent_enable(bev, EV_READ);
}


// 读回调 - 读缓冲区有数据回调 - 接收数据
void read_cb(struct bufferevent *bev, void *ctx)
{
    char buf[1024];
    memset(buf, 0, sizeof(buf));
    bufferevent_read(bev, buf, sizeof(buf));
    printf("接收到客户端发送的数据 : %s\n", buf);

    bufferevent_write(bev, buf, strlen(buf)+1);
    printf("数据已经回复给客户端\n");
}
// 写回调 - 写入内核缓冲区时回调 - 发送数据
void write_cb(struct bufferevent *bev, void *ctx)
{
    printf("数据被写入内核\n");
}

void events_cb(struct bufferevent *bev, short events, void *ctx)
{
    struct event_base *base = (struct event_base*)ctx;
    if(events & BEV_EVENT_EOF)
    {
        printf("客户端断开了链接\n");
    }
    else if(events & BEV_EVENT_ERROR)
    {
        printf("不可预期错误\n");
    }
    // 终止事件循环
    event_base_loopbreak(base);
}
服务器端示例代码