aclfile权限配置如下
aclfile.conf文件内容如下
user1 和user2 是用户名
user1 设置为订阅权限,并且只能访问的主题为"root/topic/#"
user2 设置为发布权限,并且只能访问的主题为"root/topic/#"
user3 设置超级为发布+订阅权限
test 订阅主题
使用中订阅 topic\test
user user1
topic read root/topic/#
user user2
topic write root/topic/#
user use3
topic write $SYS/#
topic read $SYS/#
pwfile 用户名密码配置内容如下
这个是用命令生成的
mosquitto_passwd -b pwdfile user1 123456 (user1 是账号,123456是密码)
user1:$7$101$XkqkC2wVNfJY8jbl$oFhDmilRTROUDIGy4DtQQluNa32GNxS4iZEaGNXUF3hpynXCwxISbU3mPVSJu0HUtGUDlPtWcQoHrx3wBBDpeg==
user2:$7$101$NR8smdq5yB4ONq9I$8CYxS/WqBTuDDnXIbHpmX5kbfokeqU52Cp2h5E8S3PrKk+uJYPtL+/+m4cT4iu9VBXjgI9h3wHm9sgSiIkirZQ==
一、 python实现
利用python paho编写 mqtt发布端和服务端
PYTHON服务端程序 sub.py
import paho.mqtt.client as mqtt
broker = '127.0.0.1'
port = 1883
topic = "root/topic"
# 连接的回调函数
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe(topic)
# 收到消息的回调函数
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port, 60)
client.loop_forever()
PYTHON发布端程序 pub.py
import paho.mqtt.client as mqtt
import time
broker = '127.0.0.1'
port = 1883
topic = "root/topic"
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker, port, 60)
for i in range(60):
client.publish(topic, payload=i, qos=0, retain=False)
print(f"send {i} to a/b{topic}")
time.sleep(1)
client.loop_forever()
启动 服务端程序
python sub.py
启动 发布端程序
python pub.py
二、 go实现
在Golang中,我们可以使用第三方库实现MQTT功能。下面以Eclipse Paho MQTT库为例
首先,需要安装Paho MQTT库。可以使用以下命令进行安装:
go get github.com/eclipse/paho.mqtt.golang
#common.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var broker = "127.0.0.1"
var port = 1883
var userName1 = "user1"
var passwd1 = "123456"
var userName2 = "user2"
var passwd2 = "123456"
var topic = "root/topic"
func sub(client mqtt.Client, producer bool) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
if producer {
fmt.Printf("Producer subscribed to topic %s", topic)
} else {
fmt.Printf("Consumer subscribed to topic %s", topic)
}
}
MQTT发布者
以下代码实现一个简单的MQTT发布者
#producer.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Producer Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func producerPoint() {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_producer")
opts.SetUsername(userName2)
opts.SetPassword(passwd2)
opts.SetKeepAlive(8 * time.Second)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client, true)
publish(client)
time.Sleep(30 * time.Second)
client.Disconnect(250)
}
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
以上代码创建了一个MQTT客户端,连接到本地代理服务器(地址为localhost:1883)。然后,指定要发布的主题和消息,并通过client.Publish方法将消息发布到主题上。
下面是一个简单的MQTT订阅者的示例:
MQTT订阅者
# consumer.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
// 其实consumer既可以收到消息,也可以发送消息
// 作为互联网硬件收集器,采集的环境信息数据(温度、湿度等)发送到broker
// 作为互联网硬件执行器,可以接受broker的消息(执行指令信息,如显示文字、声音等),并根据消息执行硬件行为
func consumerPoint() {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_consumer")
opts.SetUsername(userName1)
opts.SetPassword(passwd1)
opts.SetKeepAlive(8 * time.Second)
opts.SetDefaultPublishHandler(messageRecHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client, false)
time.Sleep(30 * time.Second)
client.Disconnect(250)
}
使用
package main
import "time"
func main() {
go consumerPoint()
go producerPoint()
time.Sleep(30 * time.Second)
}
以上代码创建了一个MQTT客户端,连接到本地代理服务器。然后,通过client.Subscribe方法订阅指定的主题,并在回调函数中处理接收到的消息。
结论:
MQTT是一种在物联网中广泛使用的消息传输协议,具有简单、轻量级和可靠性的特点。在Golang中,可以使用第三方库(如Eclipse Paho MQTT库)来实现MQTT功能。通过发布者和订阅者的示例代码,我们可以看到在Golang中实现MQTT功能是相对简单且易于理解的。
网友评论