美文网首页
GO-MqttClient

GO-MqttClient

作者: Bug2Coder | 来源:发表于2020-05-23 16:16 被阅读0次

    1、用到的包github.com/eclipse/paho.mqtt.golang

    2、简单客户端实现

    package main
    
    import (
        "encoding/json"
        "fmt"
        mqtt "github.com/eclipse/paho.mqtt.golang"
        "time"
    )
    
    //创建全局mqtt publish消息处理 handler
    var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
        fmt.Printf("Pub Client Topic : %s \n", msg.Topic())
        fmt.Printf("Pub Client msg : %s \n", msg.Payload())
    }
    
    //创建全局mqtt sub消息处理 handler
    var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
        fmt.Printf("Sub Client Topic : %s--", msg.Topic())
        fmt.Printf("Sub Client msg : %s \n", msg.Payload())
    }
    
    type message struct {
        topic   string
        payload interface{}
    }
    
    // 客户端管理器
    type MqttClientManger struct {
        client   mqtt.Client
        msgSend  chan message
        topicSub []string
    }
    
    func newMqttClient(options *mqtt.ClientOptions) mqtt.Client {
        client := mqtt.NewClient(options)
        return client
    }
    
    func NewMqttClient(ip, username, passd string, port int) *MqttClientManger {
        clinetOptions := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", ip, port)).SetUsername(fmt.Sprintf("%s", username)).SetPassword(fmt.Sprintf("%s", passd))
        //设置客户端ID
        clinetOptions.SetClientID(fmt.Sprintf("%d", time.Now().Unix()))
        //设置handler
        clinetOptions.SetDefaultPublishHandler(messagePubHandler)
        //设置连接超时
        clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
        //设置自动重连
        clinetOptions.SetAutoReconnect(true)
        //创建客户端连接
        c := newMqttClient(clinetOptions)
        msg := make(chan message)
        return &MqttClientManger{client: c, msgSend: msg}
    }
    
    // 客户端连接
    func (mg *MqttClientManger) NewConnect() {
        if token := mg.client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
            fmt.Printf("[Pub] mqtt connect error, error: %s \n", token.Error())
            return
        }
    }
    
    // 发送消息
    func (mg *MqttClientManger) Publish() {
        for {
            msg, ok := <-mg.msgSend
            if ok {
                // 格式化数据,将信息转换为json
                payload, err := json.Marshal(msg.payload)
                if err != nil {
                    fmt.Println(err)
                }
                token := mg.client.Publish(msg.topic, 1, false, payload)
                token.Wait()
            }
    
        }
    
    }
    
    // 订阅消息
    func (mg *MqttClientManger) Subscribe() {
        for _, topic := range mg.topicSub {
            token := mg.client.Subscribe(topic, 1, messageSubHandler)
            token.Wait()
        }
    
    }
    
    // 启动服务
    func (mg *MqttClientManger) run() {
        go mg.Subscribe()
        go mg.Publish()
    }
    

    相关文章

      网友评论

          本文标题:GO-MqttClient

          本文链接:https://www.haomeiwen.com/subject/ygclahtx.html