Locust设置集合点

发布时间 2023-06-06 15:27:56作者: 术科术
  • 集合点

要求系统能够承受1000 人同时提交数据,可以通过在提交数据操作前面加入集合点,这样当虚拟用户运行到提交数据的集合点时,就检查同时有多少用户运行到集合点,如果不到1000 人,已经到集合点的用户在此等待,当在集合点等待的用户达到1000 人时,1000 人同时去提交数据,从而达到测试计划中的需求。

注意:框架本身没有直接封装集合点的概念 ,间接通过gevent并发机制,使用gevent的锁来实现

  1. 方法一

semaphore是一个内置的计数器: 每当调用acquire()时,内置计数器-1 每当调用release()时,内置计数器+1 计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()

两步骤:

  1. all_locusts_spawned 创建钩子函数
  2. 将locust实例挂载到监听器 events.spawning_complete.add_listener
  3. Locust实例准备完成时触发
import os

from locust import HttpUser, TaskSet, task,between,events
from gevent._semaphore import Semaphore


all_locusts_spawned = Semaphore()
all_locusts_spawned.acquire()# 阻塞线程


def on_hatch_complete(**kwargs):
    """
    Select_task类的钩子方法
    :param kwargs:
    :return:
    """
    all_locusts_spawned.release() # # 创建钩子方法


events.spawning_complete.add_listener(on_hatch_complete) #挂在到locust钩子函数(所有的Locust示例产生完成时触发)


n = 0
class UserBehavior(TaskSet):

    def login(self):
        global n
        n += 1
        print("%s个虚拟用户开始启动,并登录"%n)

    def logout(self):
        print("退出登录")



    def on_start(self):
        self.login()

        all_locusts_spawned.wait() # 同步锁等待

    @task(4)
    def test1(self):
      

        url = '/list'
        param = {
            "limit":8,
            "offset":0,
        }
        with self.client.get(url,params=param,headers={},catch_response = True) as response:
            print("用户浏览登录首页")

    @task(6)
    def test2(self):
        

        url = '/detail'
        param = {
            'id':1
        }
        with self.client.get(url,params=param,headers={},catch_response = True) as response:
            print("用户同时执行查询")

    @task(1)
    def test3(self):
        """
        用户查看查询结果
        :return:
        """

        url = '/order'
        param = {
            "limit":8,
            "offset":0,
        }
        with self.client.get(url,params=param,headers={},catch_response = True) as response:
            print("用户查看查询结果")

    def on_stop(self):
        self.logout()


class WebsiteUser(HttpUser):
    host = 'http://www.baidu.com'
    tasks = [UserBehavior]

    wait_time = between(1, 2)

if __name__ == '__main__':
    os.system("locust -f locustfile08.py")

 

  1. 方法二:

from locust import HttpUser, task, between, SequentialTaskSet


class UserBehavior(SequentialTaskSet):

    @task
    def login(self):
        self.client.post("/login", {"username": "test", "password": "password"})
        self.wait_between(1, 3)

    @task
    def view_product_list(self):
        self.client.get("/product_list")
        self.wait_between(1, 2)

    @task
    def choose_product(self):
        self.client.get("/product/1")
        self.wait_between(2, 3)

    @task
    def fill_address(self):
        self.client.post("/address", {'address': 'xxx'})
        self.wait_between(3, 4)

    @task
    def checkout(self):
        self.client.post("/checkout", {'confirm': 'yes'})
        self.wait_between(4, 5)

    def wait_between(self, min_wait, max_wait):
        self.wait_time = between(min_wait, max_wait)


class WebsiteUser(HttpUser):
    tasks = [UserBehavior]
    wait_time = between(1, 3)

在这里,我们首先定义了一个名为UserBehavior的任务集合点类,其中定义了几个测试任务:登录、浏览商品列表、选择商品、填写收货地址、确认购买。在每个测试任务中,我们使用client发送HTTP请求,并设置适当的等待时间。

然后,我们定义了一个名为WebsiteUser的用户类,继承自HttpUser并设置了一个等待时间的范围。在这个用户类中,我们指定了使用UserBehavior任务集合点类作为任务,并设置并发用户数为100。在测试过程中,每个用户将按照UserBehavior类中定义的测试任务顺序依次执行,直至所有测试任务完成或测试时间到达。

控制台执行以下命令:

locust -f your_file_name.py --host=http://127.0.0.1:8000 --web-host=127.0.0.1 --users 100 --spawn-rate 10

在这里,我们指定了测试文件、目标网站URL、web UI地址,并设置并发用户数为100,每秒生成10个用户。测试过程中,我们可以在web UI界面上实时监控性能指标,以确定系统的瓶颈和性能问题。