本文是介绍MTQQ整套系统的搭建
MQTT 不是普通的 client server 模型,他还加了一个 代理者
先介绍Linux系统安装代理者
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt-get install mosquitto
sudo apt-get install mosquitto-clients
sudo server mosquitto start
mosquitto -v
// 带调试信息的启动
Windows安装代理者
使用Apollo搭建MQTT服务器步骤:
1.下载Apollo服务器并解压,在CMD环境运行其工作目录下的...\bin\apollo.cmd,命令后面带上参数「create mybroker」,创建服务器实例。这里需要Java环境,系统环境变量下要有JAVA_HOME。
2.创建实例之后会在bin目录下生成mybroker文件夹,其中 ...\etc\apollo.xml文件下是配置服务器信息的文件,...\etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,初始默认帐号是admin,密码password;
3.进入...\mybroker\bin\ 目录,在CMD输入命令「apollo-broker.cmd run」,可以使用TAB键自动补全,运行后输出信息如下:
其中我们要留意的:
MQTT服务器TCP连接端口:tcp://0.0.0.0:61613
后台Web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/
Python的MQTT客户端
安装下载 paho-mqtt:
pip install paho-mqtt
客户端1:
#!/usr/bin/env python
# coding=utf-8
import json
import sys
import os
import time
import paho.mqtt.client as mqtt
global client
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")
REPORT_TOPIC = 'test' # 主题
def on_connect(client, userdata, flags, rc):
print('connected to mqtt with resurt code ', rc)
client.subscribe(REPORT_TOPIC) # 订阅主题
def on_message(client, userdata, msg):
"""
接收客户端发送的消息
:param client: 连接信息
:param userdata:
:param msg: 客户端返回的消息
:return:
"""
print("Start server!")
payload = json.loads(msg.payload.decode('utf-8'))
print(payload)
def server_conenet(client):
client.on_connect = on_connect # 启用订阅模式
client.on_message = on_message # 接收消息
# client.username_pw_set("admin", "password")
client.connect("192.168.70.146", 1883, 60) # 链接
# client.connect("127.0.0.1", 1883, 60) # 链接
# client.connect("127.0.0.1", 61613, 60) # 链接
client.loop_start() # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡
# client.loop_forever() # 以forever方式阻塞运行。
def server_stop(client):
client.loop_stop() # 停止服务端
sys.exit(0)
def server_main():
global client
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
client = mqtt.Client(client_id, transport='tcp')
server_conenet(client)
def clicent_main(message: str):
"""
客户端发布消息
:param message: 消息主体
:return:
"""
TASK_TOPIC = 'test2'
time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
payload = {"msg": "%s" % message, "data": "%s" % time_now}
# publish(主题:Topic; 消息内容)
info = client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False),qos = 2,retain = False)
# print(info.wait_for_publish())
# print(info.is_published())
if info.is_published():
print("Successful send message!")
else:
print("error send message!")
return True
if __name__ == '__main__':
# 启动监听
server_main()
while True:
message = input("input:")
clicent_main(message)
time.sleep(1)
客户端2:
#!/usr/bin/env python
# coding=utf-8
import json
import sys
import os
import time
import paho.mqtt.client as mqtt
global client
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")
REPORT_TOPIC = 'test2' # 主题
def on_connect(client, userdata, flags, rc):
print('connected to mqtt with resurt code ', rc)
client.subscribe(REPORT_TOPIC) # 订阅主题
def on_message(client, userdata, msg):
"""
接收客户端发送的消息
:param client: 连接信息
:param userdata:
:param msg: 客户端返回的消息
:return:
"""
print("Start server!")
payload = json.loads(msg.payload.decode('utf-8'))
print(payload)
def server_conenet(client):
client.on_connect = on_connect # 启用订阅模式
client.on_message = on_message # 接收消息
client.connect("192.168.70.146", 1883, 60) # 链接
# client.username_pw_set("admin", "password")
# client.connect("192.168.70.146", 1883, 60) # 链接
# client.connect("127.0.0.1", 1883, 60) # 链接
# client.connect("127.0.0.1", 61613, 60) # 链接
client.loop_start() # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡
# client.loop_forever() # 以forever方式阻塞运行。
def server_stop(client):
client.loop_stop() # 停止服务端
sys.exit(0)
def server_main():
global client
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
client = mqtt.Client(client_id, transport='tcp')
server_conenet(client)
def clicent_main(message: str):
"""
客户端发布消息
:param message: 消息主体
:return:
"""
TASK_TOPIC = 'test'
time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
payload = {"msg": "%s" % message, "data": "%s" % time_now}
# publish(主题:Topic; 消息内容)
client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))
print("Successful send message!")
return True
if __name__ == '__main__':
# 启动监听
server_main()
while True:
message = input("input:")
clicent_main(message)
time.sleep(1)
这里注意下修改下代理者的端口号是多少?默认是1883,有无用户名密码登陆?
这样就完成客户端1和客户端2之间的及时通讯,有点类似聊天工具
网友评论