Traffic Control (TC) 简介和使用指南

发布时间 2023-08-11 21:51:50作者: 守望人间

Traffic Control (TC) 简介和使用指南

Traffic Control(TC)是一个在 Linux 操作系统中用于控制网络流量的工具,允许对网络流量进行限速、排队、分流以及其他管理操作。TC 用于实现 QoS(Quality of Service)和流量整形,能够更好地控制网络资源和提供更好的用户体验。我们主要用于网损控制,测试设备在低质量网络下的使用情况。

基本概念

Queue Discipline (qdisc)

  • Qdisc 是用于控制数据包排队和处理的一种机制。不同类型的 qdisc 可以用来实现不同的流量控制策略。
  • 常见的 qdisc 类型包括 pfifo、bfifo、htb、fq_codel 等。

Class

  • 在 htb(Hierarchical Token Bucket)qdisc 中,class 用于对数据包进行分类和分流。
  • 每个 class 可以设置带宽、优先级等参数。

Filter

  • Filter 用于匹配和分类数据包,根据匹配的条件将数据包分配给相应的 class 进行处理。
  • 通过 filter,可以实现根据源 IP、目标 IP、协议、端口等条件进行流量分流。

基本命令

qdisc

  • tc qdisc add dev <网卡名称> root <qdisc类型> <qdisc参数>:添加根节点的 qdisc。
  • tc qdisc show dev <网卡名称>:查看网卡下的所有 qdiscs
  • tc qdisc del dev <网卡名称> parent <父类别标识符>:删除 qdisc
  • tc qdisc del dev <网卡名称> root:清理所有的设置

class

  • tc class add dev <网卡名称> parent <父类别标识符> classid <类别标识符> <class参数>:添加 class。
  • tc qdisc add dev <网卡名称> parent <父类别标识符> <qdisc类型> <qdisc参数>:在已有的 class 下添加子 qdisc。
  • tc class show dev <网卡名称>:查看网卡下的所有 class
  • tc class del dev <网卡名称> classid <类别标识符>:删除 class

filter

  • tc filter add dev <网卡名称> protocol <协议> prio <优先级> <filter表达式> <action>:添加 filter 规则。
  • tc filter del dev <网卡名称> protocol <协议> prio <优先级>:删除 filter 规则。
  • tc filter show dev <网卡名称>:查看网卡下的所有 filter
  • tc filter del dev <网卡名称> protocol ip prio <优先级> u32 match ip src <ip>/32:删除 filter

示例用法

  1. 限制网卡 eth0 上的带宽为 1Mbps:

    tc qdisc add dev eth0 root handle 1: htb default 10
    tc class add dev eth0 parent 1: classid 1:1 htb rate 1mbit
    
  2. 添加 filter 规则,根据源 IP 分流到不同的 class:

    tc filter add dev eth0 protocol ip prio 1 u32 match ip src 192.168.1.1/32 flowid 1:1
    tc filter add dev eth0 protocol ip prio 2 u32 match ip src 192.168.1.2/32 flowid 1:2
    

注意事项

  1. 需要有一个根 qdisc
  2. 一个 qdisc 可以挂载多个 class
  3. 一个 class 可以挂载多个子 class
  4. 一个 class 只能挂载一个 qdisc
  5. 一个 class 可以挂载多个 filter

参考

使用 python 设置 TC

import subprocess
from enum import Enum, unique


GLOBAL_LOSS_IP = '0.0.0.0/0'
MAX_PRIO = 65535

# class
LIMIT_CLASS_ADD_CMD = 'tc class add dev {} parent 1:1 classid 1:{} htb rate {}Kbit'
LIMIT_CLASS_CHANGE_CMD = 'tc class change dev {} parent 1:1 classid 1:{} htb rate {}Kbit'
NO_LIMIT_CLASS_CMD = 'tc class add dev {} parent 1:1 classid 1:{} htb rate 10Gbit ceil 20Gbit'
# filter
UP_FILTER_CMD = 'tc filter add dev {} protocol ip parent 1: prio {} u32 match ip dst {} flowid 1:{}'
DOWN_FILTER_CMD = 'tc filter add dev {} protocol ip parent 1: prio {} u32 match ip src {} flowid 1:{}'
# 删除某一 ip 的网损,先删除 filter 和 qdisc,最后删除 class
DEL_QDISC_CMD = 'tc qdisc del dev {} parent 1:{}'
DEL_CLASS_CMD = 'tc class del dev {} classid 1:{}'
DEL_FILTER_CMD = 'tc filter del dev {} protocol ip prio {} u32 match ip src {}/32'

"""
tc 设置 ip 过滤,命令结构:
         root qdisc
             |
  class 1         class 2
     |               |
qdisc filter    qdisc filter
"""


@unique
class TYPE(Enum):
    DELAY = 0
    LOSS = 1
    DUP = 2
    DISORDER = 3
    IMPAIR = 4
    SPEED_LIMIT = 5
    SPEED_LOSS = 6
    PACKAGE_LOSS = 7
    PACKAGE_SPEED = 8
    LOSS_DELAY = 9
    BRUST_LOSS = 10


class Netem:
    """qdisc"""
    LOSS_TYPE_CMD_MAP = {
        TYPE.DELAY.value: 'delay {}ms',
        TYPE.LOSS.value: 'loss {}%',
        TYPE.DUP.value: 'duplicate {}%',
        TYPE.LOSS_DELAY.value: 'delay {}ms loss {}%',
        TYPE.DISORDER.value: 'reorder {}% delay {}ms',
        TYPE.IMPAIR.value: 'corrupt {}%',
        TYPE.BRUST_LOSS.value: 'loss gemodel {}% {}% 100% 0%',
    }

    def __init__(self, interface: str, parent: int, handle: int):
        self.interface = interface
        self.parent = parent
        self.handle = handle
        self.base_cmd = 'tc qdisc {} dev {} parent 1:{} handle {}: netem '
        self.loss_config_map = {}

    def generate_cmd(self, action: str, network_loss_type: int, values: tuple):
        base_cmd = self.base_cmd.format(action, self.interface, self.parent, self.handle)
        loss_cmd = self.LOSS_TYPE_CMD_MAP.get(network_loss_type)
        self.loss_config_map[network_loss_type] = loss_cmd.format(*values)
        return base_cmd + ' '.join(self.loss_config_map.values())

    def add_netem(self, network_loss_type: int, values: tuple):
        cmd = self.generate_cmd('add', network_loss_type, values)
        subprocess.run(cmd, shell=True)

    def change_netem(self, network_loss_type: int, values: tuple):
        cmd = self.generate_cmd('change', network_loss_type, values)
        subprocess.run(cmd, shell=True)

    def del_netem(self):
        cmd = DEL_QDISC_CMD.format(self.interface, self.parent)
        subprocess.run(cmd, shell=True)


class TrafficControlClass:
    """class"""
    def __init__(self, interface: str, handle: int):
        self.interface = interface
        self.handle = handle
        self.child = None

    def set_base_class(self):
        cmd = NO_LIMIT_CLASS_CMD.format(self.interface, self.handle)
        subprocess.run(cmd, shell=True)

    def set_limit_class(self, limit: int):
        cmd = LIMIT_CLASS_ADD_CMD.format(self.interface, self.handle, limit)
        subprocess.run(cmd, shell=True)

    def change_limit_class(self, limit: int):
        cmd = LIMIT_CLASS_CHANGE_CMD.format(self.interface, self.handle, limit)
        subprocess.run(cmd, shell=True)

    def del_class(self):
        cmd = DEL_CLASS_CMD.format(self.interface, self.handle)
        subprocess.run(cmd, shell=True)


class TrafficControlFilter:
    """filter"""
    def __init__(self, interface: str, prio: int, ip: str, flow_class: TrafficControlClass):
        self.interface = interface
        self.prio = prio
        self.ip = ip
        self.flow_class = flow_class

    def set_filter(self):
        # 添加 filter 限制 ip
        # 对源地址和目标地址都添加限制
        cmd = UP_FILTER_CMD.format(self.interface, self.prio, self.ip, self.flow_class.handle)
        subprocess.run(cmd, shell=True)
        cmd = DOWN_FILTER_CMD.format(self.interface, self.prio, self.ip, self.flow_class.handle)
        subprocess.run(cmd, shell=True)

    def del_filter(self):
        if self.ip == GLOBAL_LOSS_IP:
            cmd = DEL_FILTER_CMD.format(self.interface, self.prio, '0.0.0.0')
        else:
            cmd = DEL_FILTER_CMD.format(self.interface, self.prio, self.ip)
        subprocess.run(cmd, shell=True)

    def change_prio(self, prio: int):
        print('change prio: ', prio)
        self.del_filter()
        self.prio = prio
        self.set_filter()


class TrafficControl:
    """Traffic Control"""
    def __init__(self, interface: str = 'eth0', cycle_time: int = 1000):
        self.interface = interface  # 网卡
        self.cycle_time = cycle_time
        self.prio = 1  # qdisc 的 handle
        self.class_handle = 1  # class 的 handle
        self.qdisc_handle = 1  # filter 优先级
        self.ip_filter_map = {}
        self.init_tc()

    def init_tc(self):
        """
        初始化 TC 状态,判断根 qdisc 是否存在,不存在则创建根 qdisc
        """
        print('init_tc')
        # 清理网损
        cmd = 'tc qdisc del dev {} root'.format(self.interface)
        subprocess.run(cmd, shell=True)
        # 初始化参数
        self.qdisc_handle = 1
        self.class_handle = 1
        self.prio = 1
        self.ip_filter_map = {}

        # 创建根 qdisc,r2q 表示没有 default 的 root 使整个网络的带宽没有限制
        cmd = 'tc qdisc add dev {} root handle 1: htb r2q 1'.format(self.interface)
        subprocess.run(cmd, shell=True)
        self.qdisc_handle += 1
        # 创建一个限速非常大 class 用于后面绑定丢包等 netem 类型的 qdisc 和限速
        cmd = 'tc class add dev {} parent 1: classid 1:1 htb rate 10Gbit ceil 20Gbit'.format(self.interface)
        subprocess.run(cmd, shell=True)
        self.class_handle += 1

    def remove_ip_loss(self, ip: str):
        """移除 ip 的网损设置"""
        print('remove_loss: %s' % ip)
        tc_filter = self.ip_filter_map.pop(ip, None)  # type: TrafficControlFilter
        if tc_filter:
            tc_filter.del_filter()
            if tc_filter.flow_class.child:  # type: Netem
                tc_filter.flow_class.child.del_netem()
            tc_filter.flow_class.del_class()

    def process(self, network_loss_type: int, ip: str, data: dict, value=None):
        """
        网损设置
        :param network_loss_type: 网损类型
        :param ip: 设置的 ip
        :param data: 请求数据
        :param value: 周期数据,如果传入 value 代表为周期设置
        :return:
        """
        if not ip:
            ip = GLOBAL_LOSS_IP

        values = None
        if network_loss_type == TYPE.DELAY.value:
            if value is None:
                delay = data.get('delay', 0)
            else:
                delay = value
            values = (delay,)
        elif network_loss_type == TYPE.LOSS.value:
            if value is None:
                loss = data.get('loss', 0)
            else:
                loss = value
            values = (loss,)
        elif network_loss_type == TYPE.DUP.value:
            if value is None:
                dup = data.get('dup', 0)
            else:
                dup = value
            values = (dup,)
        elif network_loss_type == TYPE.DISORDER.value:
            disorder = data['disorder']['value']
            delay = data['disorder']['delay']
            values = (disorder, delay)
        elif network_loss_type == TYPE.IMPAIR.value:
            if value is None:
                impair = data.get('impair')
            else:
                impair = value
            values = (impair,)
        elif network_loss_type == TYPE.SPEED_LIMIT.value:
            if value is None:
                speed = data['speed_limit'].get('speed', 10000000)  # 10_000_000
            else:
                speed = value
            delay = data['speed_limit'].get('delay', 50)
            self.set_speed_limit(ip, speed, delay)
        elif network_loss_type == TYPE.LOSS_DELAY.value:
            delay = data['loss_delay']['delay']
            loss = data['loss_delay']['loss']
            values = (delay, loss)
        elif network_loss_type == TYPE.BRUST_LOSS.value:
            loss = data['brust']['loss']
            package = int(data['brust']['package'])
            p = float(loss) / 100  # p is the probability of transferring from Good State to the bad state
            loss_p = p * package
            r = p / loss_p - p  # r is the probability of transferring from the bad state to the Good
            values = (p * 100, r * 100)
        self.set_base(ip, network_loss_type, values)

    def set_speed_limit(self, ip: str, limit: int or float, latency: int = None):
        """
        设置限速
        :param ip: 网损设置 ip
        :param limit: 限速,单位 Kbit
        :param latency: 延迟,单位 ms
        :return:
        """
        if ip not in self.ip_filter_map:
            # 增加 class 设置限速
            tc_class = TrafficControlClass(self.interface, self.class_handle)
            tc_class.set_limit_class(limit)
            self.class_handle += 1
            # 创建 filter 类
            prio = MAX_PRIO if ip == GLOBAL_LOSS_IP else self.prio  # 保证全局网损的优先级最小
            tc_filter = TrafficControlFilter(self.interface, prio, ip, tc_class)
            tc_filter.set_filter()
            self.ip_filter_map[ip] = tc_filter
            self.prio += 1
        else:
            tc_filter = self.ip_filter_map[ip]
            tc_class = tc_filter.flow_class  # type: TrafficControlClass
            tc_class.change_limit_class(limit)
        if latency:
            self.set_base(ip, TYPE.DELAY.value, (latency,))

    def set_base(self, ip: str, network_loss_type: int, values: tuple = None):
        """
        设置 netem 类型下的基础网损,包括延迟、丢包、重复、乱序、损坏、延迟丢包、burst丢包等
        :param ip: 网损设置 ip
        :param network_loss_type: 网损类型
        :param values: 网损值
        :return:
        """
        if values is not None:
            if ip not in self.ip_filter_map:
                # 如果没有当前 ip 的网损设置,增加 class 和 filter
                # 增加 class 用来挂载 qdisc 设置 netem
                tc_class = TrafficControlClass(self.interface, self.class_handle)
                tc_class.set_base_class()
                self.class_handle += 1
                # 创建 filter 类
                prio = MAX_PRIO if ip == GLOBAL_LOSS_IP else self.prio  # 保证全局网损的优先级最小
                tc_filter = TrafficControlFilter(self.interface, prio, ip, tc_class)
                tc_filter.set_filter()
                self.ip_filter_map[ip] = tc_filter
                self.prio += 1
                netem = Netem(self.interface, tc_class.handle, self.qdisc_handle)
                netem.add_netem(network_loss_type, values)
                tc_class.child = netem
                self.qdisc_handle += 1
            else:
                # 如果存在当前 ip 的网损设置,取出对应的 class、filter 和 qdisc
                tc_filter = self.ip_filter_map[ip]
                tc_class = tc_filter.flow_class  # type: TrafficControlClass
                if tc_class.child:
                    netem = tc_class.child  # type: Netem
                    netem.change_netem(network_loss_type, values)
                else:
                    netem = Netem(self.interface, tc_class.handle, self.qdisc_handle)
                    netem.add_netem(network_loss_type, values)
                    tc_class.child = netem
                    self.qdisc_handle += 1

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.init_tc()


if __name__ == '__main__':
    tc = TrafficControl('eth0')
    # 对全局设置 20ms 延迟
    tc.process(TYPE.DELAY.value, '0.0.0.0/0', {'delay': 20})