locust多任务按顺序执行

发布时间 2023-11-30 14:25:19作者: 唐大侠的小迷弟

背景

想要低并发验证不同通道的响应时间,即不同的域名访问同一资源,需要验证不同的域名响应时长。

实践1

写了个简单的locustfile.py内容如下:

# locustfile.py
from locust import HttpUser, task, TaskSet, between

class MyUser(HttpUser):
    wait_time = between(0.1, 1.0)  # 每个请求间可能等待的时间
    
    @task
    def fun1(self):
        self.client.request_name='方法1'
        url='http://api1.sample.com/api/file?path=xxxx'
        self.req(url)
   
    
    @task
    def fun2(self):
        self.client.request_name='方法2'
        url='http://api2.sample.com/api/file?path=xxxx'
        self.req(url)
   
    @task
    def fun3(self):
        self.client.request_name='方法3'
        url='http://api3.sample.com/api/file?path=xxxx'
        self.req(url)
   
    @task
    def fun4(self):
        self.client.request_name='方法4'
        url='http://api4.sample.com/api/file?path=xxxx'
        self.req(url)
   
    def req(self,url):
        with self.client.get(url, headers=headers, catch_response=True) as r:
            if r.status_code != 200:
                r.failure(f'{urlparse(url).netloc} 返回码非200!')
            elif 'json' in r.headers.get('Content-Type'):  # 因为是流媒体,请求失败可能会返回异常的json信息,所以增加判断响应头
                r.failure(f'{urlparse(url).netloc} 请求失败,返回内容非文件')

执行 locust --headless -u 4 -t 5m MyUser -H https://www.baiduu.com/

即并发4用户执行5分钟。然后分析了请求的链路发现本应耗时比较短的域名平均耗时居然比较高,研发说是因为四个域名同时执行,DNS解析可能耗时有问题。。。。

就是说要分别单独执行,然后再重新分析耗时。所以问题来了,如果要单独执行每个域名,每次需要注释掉其他不需要执行的三个任务,最主要的是单独执行出来的报告只能展示一个请求的图表,不利于分析,为了解决这俩问题,重新实现。

实践2

改写方法如下:

# locustfile.py
from locust import HttpUser, task, TaskSet, between
from locust.exception import StopUser

class MyUser(HttpUser):
    wait_time = between(0.1, 1.0)  # 每个请求间可能等待的时间
    
    # 启动前执行,所以等于是单独测试了不同的域名低并发的请求响应时长
    def on_start(self):
        # 当Locust用户开始时只执行此方法一次
        self.execute_task_for_duration(self.fun1, 5 * 60)
        self.execute_task_for_duration(self.fun2, 5 * 60)
        self.execute_task_for_duration(self.fun3, 5 * 60)
        self.execute_task_for_duration(self.fun4, 5 * 60)
        raise StopUser  # 并不会继续执行其他的任务了,按顺序执行完上面的任务就结束

    def execute_task_for_duration(self, task_function, duration_seconds):
        end_time = time.time() + duration_seconds
        while time.time() < end_time:
            task_function()  # 调用任务函数
            time.sleep(self.user.wait_time())  # 等待指定时间
    
    @task
    def fun1(self):
        self.client.request_name='方法1'
        url='http://api1.sample.com/api/file?path=xxxx'
        self.req(url)
   
    
    @task
    def fun2(self):
        self.client.request_name='方法2'
        url='http://api2.sample.com/api/file?path=xxxx'
        self.req(url)
   
    @task
    def fun3(self):
        self.client.request_name='方法3'
        url='http://api3.sample.com/api/file?path=xxxx'
        self.req(url)
   
    @task
    def fun4(self):
        self.client.request_name='方法4'
        url='http://api4.sample.com/api/file?path=xxxx'
        self.req(url)
   
    def req(self,url):
        with self.client.get(url, headers=headers, catch_response=True) as r:
            if r.status_code != 200:
                r.failure(f'{urlparse(url).netloc} 返回码非200!')
            elif 'json' in r.headers.get('Content-Type'):  # 因为是流媒体,请求失败可能会返回异常的json信息,所以增加判断响应头
                r.failure(f'{urlparse(url).netloc} 请求失败,返回内容非文件')

最后的图表如下:
image-20231129170901077