首先安装paho.mqtt.client这个python库:
需求描述:
- 第一步:客户端访问某个路由就可以执行固定的提交到MQTT服务器的操作。
- 第二步:客户端以get方法访问某个路由,flask提取get中的参数并验证,验证完成以后将参数中的数据提交到MQTT服务器中。
- 第三步:将前面的代码整合到SockIO中去。
一、flask上的简单应用:
#encoding: utf-8
import paho.mqtt.client as mqtt
# 当连接上服务器后回调此函数
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# 放在on_connect函数里意味着
# 重新连接时订阅主题将会被更新
client.subscribe("chat")
# 从服务器接受到消息后回调此函数
def on_message(client, userdata, msg):
print("主题:" + msg.topic + " 消息:" + str(msg.payload))
client = mqtt.Client()
# 参数有 Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
client.on_connect = on_connect # 设置连接上服务器回调函数
client.on_message = on_message # 设置接收到服务器消息回调函数
client.connect("test.mosquitto.org", 1883, 60) # 连接服务器,端口为1883,维持心跳为60秒
client.publish('chat','this is a test!') # 往主题chat里发送消息
client.loop_forever()
二、flaskIO配合MQTT的应用
#encoding: utf-8
from flask import Flask, render_template, request
import eventlet
import json
import paho.mqtt.client as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'test.mosquitto.org'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 60
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2
# Parameters for SSL enabled
# app.config['MQTT_BROKER_PORT'] = 8883
# app.config['MQTT_TLS_ENABLED'] = True
# app.config['MQTT_TLS_INSECURE'] = True
# app.config['MQTT_TLS_CA_CERTS'] = 'ca.crt'
mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)
@app.route('/')
def hello_world():
return render_template('index.html')
@app.route('/ws')
def wevsocket():
return render_template('websocket_mqtt_demo.html')
@socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt_ws.publish(data['topic'], data['message'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt_ws.subscribe(data['topic'])
@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
socketio.emit('mqtt_message', data=data)
@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
if __name__ == '__main__':
socketio.run(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)
三、flask、flaskIO、加纯Python代码的MQTT
#encoding: utf-8
from flask import Flask, render_template, request
import eventlet
import json
import paho.mqtt.client as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'test.mosquitto.org'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 60
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2
# Parameters for SSL enabled
# app.config['MQTT_BROKER_PORT'] = 8883
# app.config['MQTT_TLS_ENABLED'] = True
# app.config['MQTT_TLS_INSECURE'] = True
# app.config['MQTT_TLS_CA_CERTS'] = 'ca.crt'
mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)
# 这里的初始化MQTT可以让纯Python代码运行起来
mqtt_client = mqtt_client.Client()
mqtt_client.connect("test.mosquitto.org", 1883, 60) # 连接服务器,端口为1883,维持心跳为60秒
@app.route('/')
def hello_world():
return render_template('index.html')
# 每次访问这个路由将向chat主题,提交一次数据data
@app.route('/mqtts')
def mqtts():
# mqtt_client = myMqtt()
mqtt_client.publish('chat', 'data')
return 'ok'
@app.route('/ws')
def wevsocket():
return render_template('websocket_mqtt_demo.html')
@socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt_ws.publish(data['topic'], data['message'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt_ws.subscribe(data['topic'])
@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
socketio.emit('mqtt_message', data=data)
@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
if __name__ == '__main__':
socketio.run(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)
网友评论