美文网首页
GO-MqttClient

GO-MqttClient

作者: Bug2Coder | 来源:发表于2020-05-23 16:16 被阅读0次

1、用到的包github.com/eclipse/paho.mqtt.golang

2、简单客户端实现

package main

import (
    "encoding/json"
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "time"
)

//创建全局mqtt publish消息处理 handler
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Pub Client Topic : %s \n", msg.Topic())
    fmt.Printf("Pub Client msg : %s \n", msg.Payload())
}

//创建全局mqtt sub消息处理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Sub Client Topic : %s--", msg.Topic())
    fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}

type message struct {
    topic   string
    payload interface{}
}

// 客户端管理器
type MqttClientManger struct {
    client   mqtt.Client
    msgSend  chan message
    topicSub []string
}

func newMqttClient(options *mqtt.ClientOptions) mqtt.Client {
    client := mqtt.NewClient(options)
    return client
}

func NewMqttClient(ip, username, passd string, port int) *MqttClientManger {
    clinetOptions := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", ip, port)).SetUsername(fmt.Sprintf("%s", username)).SetPassword(fmt.Sprintf("%s", passd))
    //设置客户端ID
    clinetOptions.SetClientID(fmt.Sprintf("%d", time.Now().Unix()))
    //设置handler
    clinetOptions.SetDefaultPublishHandler(messagePubHandler)
    //设置连接超时
    clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
    //设置自动重连
    clinetOptions.SetAutoReconnect(true)
    //创建客户端连接
    c := newMqttClient(clinetOptions)
    msg := make(chan message)
    return &MqttClientManger{client: c, msgSend: msg}
}

// 客户端连接
func (mg *MqttClientManger) NewConnect() {
    if token := mg.client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
        fmt.Printf("[Pub] mqtt connect error, error: %s \n", token.Error())
        return
    }
}

// 发送消息
func (mg *MqttClientManger) Publish() {
    for {
        msg, ok := <-mg.msgSend
        if ok {
            // 格式化数据,将信息转换为json
            payload, err := json.Marshal(msg.payload)
            if err != nil {
                fmt.Println(err)
            }
            token := mg.client.Publish(msg.topic, 1, false, payload)
            token.Wait()
        }

    }

}

// 订阅消息
func (mg *MqttClientManger) Subscribe() {
    for _, topic := range mg.topicSub {
        token := mg.client.Subscribe(topic, 1, messageSubHandler)
        token.Wait()
    }

}

// 启动服务
func (mg *MqttClientManger) run() {
    go mg.Subscribe()
    go mg.Publish()
}

相关文章

  • GO-MqttClient

    1、用到的包github.com/eclipse/paho.mqtt.golang 2、简单客户端实现

网友评论

      本文标题:GO-MqttClient

      本文链接:https://www.haomeiwen.com/subject/ygclahtx.html