美文网首页
MQTT本地实现

MQTT本地实现

作者: 时尚灬IT男 | 来源:发表于2020-08-12 10:45 被阅读0次

本文是介绍MTQQ整套系统的搭建

MQTT 不是普通的 client server 模型,他还加了一个 代理者

先介绍Linux系统安装代理者

sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt-get install mosquitto
sudo apt-get install mosquitto-clients
sudo server mosquitto start
mosquitto -v
 // 带调试信息的启动

Windows安装代理者

使用Apollo搭建MQTT服务器步骤:
1.下载Apollo服务器并解压,在CMD环境运行其工作目录下的...\bin\apollo.cmd,命令后面带上参数「create mybroker」,创建服务器实例。这里需要Java环境,系统环境变量下要有JAVA_HOME。
2.创建实例之后会在bin目录下生成mybroker文件夹,其中 ...\etc\apollo.xml文件下是配置服务器信息的文件,...\etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,初始默认帐号是admin,密码password;
3.进入...\mybroker\bin\ 目录,在CMD输入命令「apollo-broker.cmd run」,可以使用TAB键自动补全,运行后输出信息如下:

12331234132.PNG
其中我们要留意的:
MQTT服务器TCP连接端口:tcp://0.0.0.0:61613
后台Web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/

Python的MQTT客户端

安装下载 paho-mqtt:

pip install paho-mqtt

客户端1:

#!/usr/bin/env python
# coding=utf-8

import json
import sys
import os
import time
import paho.mqtt.client as mqtt

global client

sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")

REPORT_TOPIC = 'test'  # 主题


def on_connect(client, userdata, flags, rc):
    print('connected to mqtt with resurt code ', rc)
    client.subscribe(REPORT_TOPIC)  # 订阅主题


def on_message(client, userdata, msg):
    """
    接收客户端发送的消息
    :param client: 连接信息
    :param userdata:
    :param msg: 客户端返回的消息
    :return:
    """
    print("Start server!")
    payload = json.loads(msg.payload.decode('utf-8'))
    print(payload)


def server_conenet(client):
    client.on_connect = on_connect  # 启用订阅模式
    client.on_message = on_message  # 接收消息
    # client.username_pw_set("admin", "password")
    client.connect("192.168.70.146", 1883, 60)  # 链接
    # client.connect("127.0.0.1", 1883, 60)  # 链接
    # client.connect("127.0.0.1", 61613, 60)  # 链接
    client.loop_start()   # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡
    # client.loop_forever()  # 以forever方式阻塞运行。
def server_stop(client):
    client.loop_stop()  # 停止服务端
    sys.exit(0)

def server_main():
    global client
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id, transport='tcp')
    server_conenet(client)

def clicent_main(message: str):
    """
    客户端发布消息
    :param message: 消息主体
    :return:
    """
    TASK_TOPIC = 'test2'

    time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
    payload = {"msg": "%s" % message, "data": "%s" % time_now}
    # publish(主题:Topic; 消息内容)
    info = client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False),qos = 2,retain = False)
    # print(info.wait_for_publish())
    # print(info.is_published())
    if info.is_published():
        print("Successful send message!")
    else:
        print("error send message!")

    return True


if __name__ == '__main__':
    # 启动监听
    server_main()
    while True:
        message = input("input:")
        clicent_main(message)
        time.sleep(1)

客户端2:

#!/usr/bin/env python
# coding=utf-8

import json
import sys
import os
import time
import paho.mqtt.client as mqtt

global client

sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")

REPORT_TOPIC = 'test2'  # 主题


def on_connect(client, userdata, flags, rc):
    print('connected to mqtt with resurt code ', rc)
    client.subscribe(REPORT_TOPIC)  # 订阅主题


def on_message(client, userdata, msg):
    """
    接收客户端发送的消息
    :param client: 连接信息
    :param userdata:
    :param msg: 客户端返回的消息
    :return:
    """
    print("Start server!")
    payload = json.loads(msg.payload.decode('utf-8'))
    print(payload)


def server_conenet(client):
    client.on_connect = on_connect  # 启用订阅模式
    client.on_message = on_message  # 接收消息
    client.connect("192.168.70.146", 1883, 60)  # 链接

    # client.username_pw_set("admin", "password")
    # client.connect("192.168.70.146", 1883, 60)  # 链接
    # client.connect("127.0.0.1", 1883, 60)  # 链接
    # client.connect("127.0.0.1", 61613, 60)  # 链接

    client.loop_start()  # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡
    # client.loop_forever()  # 以forever方式阻塞运行。

def server_stop(client):
    client.loop_stop()  # 停止服务端
    sys.exit(0)


def server_main():
    global client
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id, transport='tcp')
    server_conenet(client)


def clicent_main(message: str):
    """
    客户端发布消息
    :param message: 消息主体
    :return:
    """
    TASK_TOPIC = 'test'

    time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
    payload = {"msg": "%s" % message, "data": "%s" % time_now}
    # publish(主题:Topic; 消息内容)
    client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))
    print("Successful send message!")
    return True


if __name__ == '__main__':
    # 启动监听
    server_main()
    while True:
        message = input("input:")
        clicent_main(message)
        time.sleep(1)


这里注意下修改下代理者的端口号是多少?默认是1883,有无用户名密码登陆?
这样就完成客户端1和客户端2之间的及时通讯,有点类似聊天工具

相关文章

网友评论

      本文标题:MQTT本地实现

      本文链接:https://www.haomeiwen.com/subject/yhjpdktx.html