美文网首页
locust 开源示例

locust 开源示例

作者: Canon_2020 | 来源:发表于2020-04-20 09:08 被阅读0次

    示例1 locustfile.py 和 run_test.py

    #!/usr/local/python3
    # -*- coding: utf-8 -*-
    # @Date    : 2018-04-15 09:00:00
    # @Author  : Canon
    # @Link    : https://www.python.org
    # @Version : 3.6.1
    
    from locust import HttpLocust
    from locust import TaskSet
    from locust import task
    
    
    class BlogDemo(TaskSet):
        '''用户行为:打开我的博客首页demo'''
        @task(1)
        def open_blog(self):
            # 定义requests的请求头
            header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}
            
            r = self.client.get("/s?ie=UTF-8&wd=python",  headers=header, verify=False)
            print(r.status_code)
            assert r.status_code == 200
    
    
    class websitUser(HttpLocust):
        task_set = BlogDemo
        min_wait = 3000  # 单位毫秒
        max_wait = 6000  # 单位毫秒
    
    
    #!/usr/local/python3
    # -*- coding: utf-8 -*-
    # @Date    : 2018-04-15 09:00:00
    # @Author  : Canon
    # @Link    : https://www.python.org
    # @Version : 3.6.1
    
    
    if __name__ == "__main__":
        import os
        os.system("locust -f E:\PythonCase\PyScript\LocustTest\Example\locustfile.py --host=https://www.baidu.com")
    
    

    示例2 locustfile.py 和 run_locust.py

    #!/usr/local/python3
    # -*- coding: utf-8 -*-
    # @Date    : 2018-04-15 09:00:00
    # @Author  : Canon
    # @Link    : https://www.python.org
    # @Version : 3.6.1
    
    
    import time
    from locust import Locust, TaskSet, events, task
    import sys
    import os
    import requests
    sys.path.append(os.path.abspath(os.path.dirname(__file__)))
    sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
    
    
    class test_demo(object):
        """接口测试demo"""
    
        def __init__(self, ):
            pass
    
        @staticmethod
        def baidu():
            url = "https://www.baidu.com/s?ie=UTF-8&wd=python"
            response = requests.get(url)
            response.raise_for_status()
    
    
    class HttpClient(object):
        def __init__(self):
            pass
    
        def test_demo_test_httpbin_get(self):
            test_demo().baidu()
    
        def test_demo_test_httpbin_post(self):
            pass
    
        def test_demo_test_webservice(self):
            pass
    
    
    class HttpLocust(Locust):
        def __init__(self, *args, **kwargs):
            super(HttpLocust, self).__init__(*args, **kwargs)
            self.client = HttpClient()
    
    
    class ApiUser(HttpLocust):
        min_wait = 10
        max_wait = 100
    
        class task_set(TaskSet):
            # @task(1)
            # def test_demo_test_webservice(self):
            #     self.client.test_demo_test_webservice()
            # @task(1)
            # def test_demo_test_httpbin_post(self):
            #     self.client.test_demo_test_httpbin_post()
            @task(2)
            def test_demo_httpbin_get(self):
                self.client.test_demo_test_httpbin_get()
    
    
    #!/usr/local/python3
    # -*- coding: utf-8 -*-
    # @Date    : 2018-04-15 09:00:00
    # @Author  : Canon
    # @Link    : https://www.python.org
    # @Version : 3.6.1
    
    
    import sys
    import os
    sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
    import multiprocessing
    
    """
    Please Visit http://localhost:8089/
    """
    
    
    def start_slave(locust_file, master_port):
        print('start slave pid:\t{0}'.format(os.getpid()))
        os.system('locust -f {0} --slave --master-port {1}'.format(locust_file, master_port))
    
    
    if __name__ == '__main__':
        multiprocessing.freeze_support()
    
        # locustfile.py
        locust_file = "E:\PythonCase\PyScript\LocustTest\Example\locustfile.py"
    
        # locust 运行命令
        locust_command = '--no-web -c 10 -r 2 --run-time 5m'
    
        # 运行locust
        if 'master' in locust_command:
            # 分布式
            num = 2
            master_port = 5557
            record = []
            for i in range(num):
                process = multiprocessing.Process(target=start_slave, args=(locust_file, master_port))
                process.start()
                record.append(process)
    
            print('start master pid:\t{0}'.format(os.getpid()))
            cmd = 'locust -f {0} {1}'.format(locust_file, locust_command)
            print('cmd:\t{0}'.format(cmd))
            os.system(cmd)
        else:
            # 单例模式
            cmd = 'locust -f {0} {1}'.format(locust_file, locust_command)
            print('cmd:\t{0}'.format(cmd))
            os.system(cmd)
    
    

    相关文章

      网友评论

          本文标题:locust 开源示例

          本文链接:https://www.haomeiwen.com/subject/bjhtwctx.html