locust_mqtt_code

作者: 乐观的星辰 | 来源:发表于2020-07-23 17:52 被阅读0次

    mqtt_core.py -- mqtt核心类

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/8 3:56 下午
    # @Author  : xiaobin
    # @File    : mqtt_core.py
    # @Software: PyCharm
    import paho.mqtt.client as mqtt
    from locust import (TaskSet,task,events,Locust)
    from gevent._semaphore import Semaphore
    import json
    import time
    from util.data_params import *
    
    all_locusts_spawned = Semaphore()
    all_locusts_spawned.acquire()
    
    def on_hatch_complete(**kwargs):
        all_locusts_spawned.release()
    
    events.hatch_complete += on_hatch_complete
    
    class MqttClient(object):
    
        """
            订阅topic
        """
    
        def set_mqtt_params(self, data):
            self.size = 1
            self.start_time = time.time()
            self.data = data
            self._qos = 1
            self._port = 1883
            self._timeout = 1000
            self._clean_session = False
            self._topics = []
            if self.data:
                data_json = json.loads(self.data)
                self._host = data_json.get('data').get('req').get('endPoint')
                self._user = data_json.get('data').get('username')
                self._password = data_json.get('data').get('password')
                self._client_id = data_json.get('data').get('req').get('clientId')
                topics = data_json.get('data').get('req').get('topics')
                if type(topics) == dict:
                    for k, v in topics.items():
                        for i in v:
                            topic = f'{k}/{i}'
                            if topic:
                                self._topics.append(topic)
                if self._client_id:
                    self.file_url = f'../files/{self._client_id}'
    
        def _on_connect(self, client, userdata, flags, rc):
            print(f'client_id: {self._client_id} Connected with result code:{rc}, time:{time.time()}')
            total_time = int((time.time() - self.start_time) * 1000)
            if self.size > 0:
                events.request_success.fire(
                    request_type='mqtt',
                    name='连接数',
                    response_time=total_time,
                    response_length=0)
                save_file_size(self.file_url, f'Connected with result code:{rc}, time:{time.time()}')
                self.size = self.size - 1
    
    
        def _on_message(self, client, userdata, msg):
            # print(f'msg.topic: {msg.topic}, msg.payload: {msg.payload}')
            data = msg.payload
            response_length = (len(data))
            data = json.loads(data)
            start_time = int(data.get('start_time'))
            end_time = int(time.time() * 1000)
            total_time = end_time - start_time
            msg_id = data.get('msg_id')
            events.request_success.fire(
                request_type='mqtt',
                name='收到消息',
                response_time=total_time,
                response_length=response_length)
            save_file_size(self.file_url, f'{self._client_id}, {msg_id}')
    
        def client_loop(self):
            client = mqtt.Client(client_id=self._client_id, clean_session=self._clean_session)
            client.username_pw_set(username=self._user, password=self._password)
            client.on_connect = self._on_connect
            client.on_message = self._on_message
            client.connect(host=self._host, port=self._port, keepalive=self._timeout)
            for topic in self._topics:
                client.subscribe(topic, qos=self._qos)
            client.loop_forever()
    

    locust_mqtt.py -- 继承locust父类

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/8 4:02 下午
    # @Author  : xiaobin
    # @File    : locust_mqtt_test.py
    # @Software: PyCharm
    from locust import (TaskSet,task,events,Locust)
    import core.mqtt_core
    
    class locust_mqtt(Locust):
    
        def __init__(self):
            super(locust_mqtt, self).__init__()
            self.client = core.mqtt_core.MqttClient()
    

    test_mqtt.py --locust 执行文件

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/8 4:04 下午
    # @Author  : xiaobin
    # @File    : test_mqtt.py
    # @Software: PyCharm
    import sys
    import os
    curPath = os.path.abspath(os.path.dirname(__file__))
    rootPath = os.path.split(curPath)[0]
    sys.path.append(rootPath)
    from locust import *
    import core.locust_mqtt
    from util.parasm_size_lock import *
    
    class test_mqtt(TaskSet):
    
        @task(weight=1)
        def test_mqtt_connect(self):
            if self.locust.token_list:
                data = self.locust.token_list[0]
                self.client.set_mqtt_params(data)
                self.locust.token_list.pop(0)
                self.client.client_loop()
    
    class myrun(core.locust_mqtt.locust_mqtt):
        task_set = test_mqtt
        host = 'http://127.0.0.1:8089'
        file_name = '../files/mqtt_token_005'
        token_list = []
        with open(file_name, 'rb') as file_to_read:
            print('start...')
            while True:
                lines = file_to_read.readline()
                if lines:
                    data = lines
                    if isinstance(data, bytes):
                        data = str(data, encoding='utf-8')
                    data = data.replace('\n', '')
                    token_list.append(data)
                else:
                    print('end')
                    break
        lock = SizeLock(slave_size=16, data_list_size=len(token_list))
        start_size = lock.get_start_size()
        end_size = lock.get_end_size()
        token_list = token_list[start_size:end_size]
        print(f'启动获取mqtt动态唯一参数,start_size:{start_size}, end_size:{end_size} 可用token size:{len(token_list)}')
        min_wait = 0
        max_wait = 0
    

    file_lock -- 分布式唯一参数控制,通过本地文件,每次服务m/s启动需要重置file中size为0或则del file

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/9 1:49 下午
    # @Author  : xiaobin
    # @File    : parasm_size_lock.py
    # @Software: PyCharm
    import io
    
    class SizeLock(object):
    
        """
            用本地文件落地资源文件控制获取唯一参数
            :param slave_size 启动节点数
            :param data_list_size 初始化参数不唯一数
            0 : 加载全部,从1开始
            根据 启动节点数和初始化参数 和文件 size 计算得到该slave节点启动加载到的参数范围
        """
        def __init__(self, slave_size, data_list_size):
            self.slave_size = slave_size
            self.data_list_size = data_list_size
            self.file_url = '../files/size.lock'
            self._set_scale_value()
    
        def _set_scale_value(self):
            self.scale_value = self.data_list_size//self.slave_size
            self.scale_value = int(self.scale_value)
    
        def save_file_size(self, data):
            try:
                fl = io.open(self.file_url, 'w')
                fl.write(data)
                fl.close()
            except Exception as e:
                print(e)
                fl.close()
    
    
        def get_file_size(self):
            size = 0
            try:
                with open(self.file_url, 'rb') as file_to_read:
                    while True:
                        lines = file_to_read.readline()
                        if lines:
                            data = lines
                            if isinstance(data, bytes):
                                data = str(data, encoding='utf-8')
                            size = int(data)
                        else:
                            break
            except Exception as e:
                print(e)
            return size
    
        def get_start_size(self):
            """
              开始兼容为0的情况
            """
            file_size = self.get_file_size()
            if file_size == 0:
                return 0
            elif file_size * self.scale_value < self.data_list_size:
                return file_size * self.scale_value - self.scale_value
            else:
                print(f'获取index_size异常了')
                return self.data_list_size
    
        def get_end_size(self):
            """
                兼容最后一次多赋值情况,比如 10 /3 0,1,2   4,5,6   7,8,9,10
            :return:
            """
            file_size = self.get_file_size()
            end_size = None
            if file_size == 0:
                end_size = self.scale_value
            else:
                if (file_size+1) * self.scale_value <= self.data_list_size:
                    end_size = file_size * self.scale_value
                else:
                    end_size = self.data_list_size
            if end_size is not None:
                self.save_file_size(f'{file_size+1}')
            return end_size
    
    if __name__ == '__main__':
        list_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        s = SizeLock(3, len(list_data))
        start_size = s.get_start_size()
        end_size = s.get_end_size()
        print(start_size, end_size)
        print(list_data[start_size:end_size])
    

    相关文章

      网友评论

        本文标题:locust_mqtt_code

        本文链接:https://www.haomeiwen.com/subject/hbwhlktx.html