dpdk官方转发例子分析

发布时间 2023-10-01 17:22:45作者: 王景迁

例子源码
http://dpdk.org/browse/dpdk/tree/examples/skeleton/basicfwd.c

main函数主流程

1. 初始化环境抽象层EAL

int ret = rte_eal_init(argc, argv);
if (ret < 0)
    rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");

2. 分配mempool

dpdk使用mbuf保存packet,mempool用于操作mbuf。

mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
        MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL)
    rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");

3. 初始化所有网卡port

for (portid = 0; portid < nb_ports; portid++) {
    if (port_init(portid, mbuf_pool) != 0)
        rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", portid);
}
static inline int
port_init(uint8_t port, struct rte_mempool *mbuf_pool)
{
    struct rte_eth_conf port_conf = port_conf_default;
    const uint16_t rx_rings = 1, tx_rings = 1;
    int retval;
    uint16_t q;

    // rte_eth_dev_count() 获取可用 eth 的个数
    if (port >= rte_eth_dev_count())
        return -1;

    /* 配置网卡设备 */
    retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
    if (retval != 0)
        return retval;

    /* 每个 port 1 个 rx 队列 */
    for (q = 0; q < rx_rings; q++) {
        retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
                rte_eth_dev_socket_id(port), NULL, mbuf_pool);
        if (retval < 0)
            return retval;
    }

    /* 每个 port 1 个 tx 队列 */
    for (q = 0; q < tx_rings; q++) {
        retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
                rte_eth_dev_socket_id(port), NULL);
        if (retval < 0)
            return retval;
    }

    /* 启用网卡设备 */
    retval = rte_eth_dev_start(port);
    if (retval < 0)
        return retval;

    struct ether_addr addr;
    rte_eth_macaddr_get(port, &addr);
    printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
               " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
            (unsigned)port,
            addr.addr_bytes[0], addr.addr_bytes[1],
            addr.addr_bytes[2], addr.addr_bytes[3],
            addr.addr_bytes[4], addr.addr_bytes[5]);

    /* 设置网卡混杂模式 */
    rte_eth_promiscuous_enable(port);

    return 0;
}

4. 为每个核调用线程

/*
 * 从 input 网卡读包,转发写入 output 网卡
 */
static __attribute__((noreturn)) void
lcore_main(void)
{
    const uint8_t nb_ports = rte_eth_dev_count();
    uint8_t port;

    /* 为了更好的性能,检查收发网卡是否在同一 NUMA 节点 */
    for (port = 0; port < nb_ports; port++)
        if (rte_eth_dev_socket_id(port) > 0 &&
                rte_eth_dev_socket_id(port) != (int)rte_socket_id())
            printf("WARNING, port %u is on remote NUMA node to "
                    "polling thread.\n\tPerformance will "
                    "not be optimal.\n", port);

    printf("\nCore %u forwarding packets. [Ctrl+C to quit]\n", rte_lcore_id());

    /* 死循环收包,Ctrl+C 退出 */
    for (;;) {
        for (port = 0; port < nb_ports; port++) {
            struct rte_mbuf *bufs[BURST_SIZE];

            /* 从网卡读包 */
            const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);
            if (unlikely(nb_rx == 0))  // 没有读到包就继续下一个 port
                continue;

            /* 发送到网卡 */
            const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0, bufs, nb_rx);
            if (unlikely(nb_tx < nb_rx)) {  // 手动释放没有发送出去的 mbuf
                uint16_t buf;
                for (buf = nb_tx; buf < nb_rx; buf++)
                    rte_pktmbuf_free(bufs[buf]);
            }
        }
    }
}

rte_eth_tx_burst发送成功后会自动释放mbuf,失败时需要代码手动释放。

参考资料

https://blog.csdn.net/fengfengdiandia/article/details/70821968