美文网首页
mqtt python测试脚本

mqtt python测试脚本

作者: heliping_peter | 来源:发表于2019-08-09 15:07 被阅读0次
    1. on_message回调函数
      回调函数无法返回message,只能在函数体里面进行操作,所以将message写入数据库,然后在数据库中去查询。
    def on_message(client, userdata, msg):
        mss =  str((msg.payload).decode('utf-8'))    
        #print(msg.topic + " " + mss)
        insert_msg(msg.topic, mss)
    
    1. sqlite数据库
      使用参数输入
    c.execute("REPLACE INTO mqtt (TOPIC, MSG) VALUES ('%s', '%s')" %(topic, msg))
    

    返回的数据是一个数组,数组每个元素是一条记录元组,所以要获取值需要这样处理:

    c.execute("select msg from mqtt where topic = '%s'" %(topic))
        mss = c.fetchall()[0][0]
    
    1. mqtt
      订阅的测试方法,需要先连接,然后启动loop,订阅topic,发送消息,停止loop
    client = mqtt_connect(MQTT_SERVER_ADDR, MQTT_SERVER_PORT,PRODUCT_KEY, DEVICE_NAME,DEVICE_SECRET, TIMESTAMP)
            client.loop_start()
            mqtt_subscribe(client, self.TOPIC_REPORT)
            mqtt_publish(client, self.TOPIC_REPORT, MSG)
            time.sleep(4)
            client.loop_stop()
    

    相关文章

      网友评论

          本文标题:mqtt python测试脚本

          本文链接:https://www.haomeiwen.com/subject/ojlhjctx.html