美文网首页
mqtt消息推送

mqtt消息推送

作者: Raral | 来源:发表于2024-08-11 13:56 被阅读0次

mqtt是什么?

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。

后端集成mqtt客户端-生产者

  • pom
<!--        mqtt-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

        <!--mqtt相关依赖-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  • 发布者

    public class PublishSample {
    
    
    
        public static void main(String[] args) {
            //为了让前端js 接受到消息只能先websocket 协议
            String broker = "ws://broker.emqx.io:8083/mqtt";
    //        String broker = "tcp://broker.emqx.io:1883";
            String topic = "topic-123";
            String clientid = "publish_clien";
            String content = "各位好!";
            int qos = 0;
    
            try {
                MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
                // 连接参数
                MqttConnectOptions options = new MqttConnectOptions();
                options.setConnectionTimeout(60);
                options.setKeepAliveInterval(60);
                // 连接
                client.connect(options);
                // 创建消息并设置 QoS
    //            MqttMessage message = new MqttMessage(content.getBytes());
    //            message.setQos(qos);
                // 发布消息
    //            client.publish(topic, message);
    //            System.out.println("Message published");
    //            System.out.println("topic: " + topic);
    //            System.out.println("message content: " + content);
    
                //模拟产生日志
                for (int i = 0; i < 50; i++) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String msg = "发送【" + i + "】条日志信息-" + UUID.randomUUID().toString();
                    MqttMessage message = new MqttMessage(msg.getBytes());
                    message.setQos(qos);
                    client.publish(topic, message);
                    System.out.println("Message published");
                    System.out.println("topic: " + topic);
                    System.out.println("message content: " + content);
                }
    
    
                // 关闭连接
                client.disconnect();
                // 关闭客户端
                client.close();
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        }
    }
    

前端集成mqtt客户端-消费者

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>mqtt</title>
    <style>
        #txt {
            border: 1px solid;
            width: 800px;
            height: 500px;
            overflow-y: scroll;

        }

    </style>
</head>
<body>
    <h5>日志实时展示</h5>
    <div id="txt">

    </div>

</body>
<script src="https://cdn.bootcdn.net/ajax/libs/mqtt/4.1.0/mqtt.min.js"></script>
<script>



    // Broker: broker.emqx.io
    // TCP Port: 1883
    // Websocket Port: 8083
    const connectUrl = `ws://broker.emqx.io:8083/mqtt`;
    // const connectUrl = `ws://broker.emqx.io/mqtt`;

    client = mqtt.connect(connectUrl, {
        clean: true,
        connectTimeout: 4000,
        reconnectPeriod: 1000,
        clientId: 'emqx_test',
        username: 'emqx_test',
        password: 'emqx_test'
    })

    // 需要订阅的主题
    const topic = 'topic-123';

    //成功连接后触发的回调
    client.on('connect', () => {
        console.log('已经连接成功');
        // 这里可以订阅多个主题
        client.subscribe([topic], () => {
            console.log(`订阅了主题 ${topic}`)
        })
    });

    // 当客户端收到一个发布过来的消息时触发回调
    client.on('message', function (topic, message, packet) {
        // 这里有可能拿到的数据格式是Uint8Array格式,所以可以直接用toString转成字符串
        // let data = JSON.parse(message.toString);
        // var s = JSON.stringify(message.toString());
        console.log("返回的数据:", message.toString())
        // console.log("返回的数据2:", s)


        //将字节数组转换 成 普通 字符串 utf-8编码
        var blob = new Blob([message]);
        var fileReader = new FileReader();
        fileReader.onload = function (event) {
            var result = event.target.result;
            console.log("解析收到消息:" + result)

            //渲染到页面上
            var txtDiv = document.querySelector("#txt");
            var p = document.createElement("p");
            p.textContent = `${result}`;
            txtDiv.appendChild(p);

        }
        fileReader.readAsText(blob)

    });

    // 连接断开后触发的回调
    client.on("close", function () {
        console.log("已断开连接")
    });



</script>

</html>

运行效果

image.png

相关文章

  • ios MQTT协议及通讯的解决方案

    做消息推送有一段时间了,想和大家分享一下通过MQTT消息推送的解决方案。 一、MQTT简介 MQTT(Messag...

  • MQTT实现消息推送

    MQTT实现消息推送 后台:websocket (后台自己架构) (as导包:):https://github.c...

  • Android消息推送-Mqtt

    Android及时通讯和消息推送 这里先简单说下两者区别 A → B 消息推送(在APP端,消息推送可以作为即时通...

  • 使用 Arduino 通过 MQTT 协议连接 HomeAssi

    前言 如果对于 MQTT 协议有一点了解的话,应该知道设备既可以推送消息给 MQTT 代理,也可以从 MQTT 代...

  • esp8266接入小爱同学,通过mqtt

    原理:esp8266连接mqtt服务,并订阅mqtt的主题,当通过小爱语音发出指令,相当于mqtt的消息推送,由于...

  • Spring集成MQTT推送消息

    (mqtt java客户端的使用参看这篇文章) Spring Integration基于Eclipse Paho ...

  • 消息推送标准协议:MQTT

    随着物联网(Internet of Things,IoT)的兴起,机器之间(Machine-to-Machine,...

  • 基于MQTT的消息推送

    背景 因为前段时间公司做了一个基于路由器的项目,在APP客户端可以动态的显示连接到路由器的设备列表以及每台设备所消...

  • Android消息推送MQTT实战

    1 前言 年初做了一款Android TV 应用,用到了MQTT。主要实现的是类似一些景区利用大屏幕实时显示景点人...

  • MQTT在Android上的二次封装

    MQTT在物联网上的运用十分的广泛,目前移动端的消息推送服务,数据直连服务等都是基于MQTT消息协议进行。阅读了很...

网友评论

      本文标题:mqtt消息推送

      本文链接:https://www.haomeiwen.com/subject/ahxrkjtx.html