美文网首页
MQTT的介绍及Java实现MQTT通讯

MQTT的介绍及Java实现MQTT通讯

作者: 在山的那边是海 | 来源:发表于2022-11-09 10:41 被阅读0次

    视频说明:https://www.bilibili.com/video/BV1qf4y1n7js/?p=3

    关于MQTT

    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量简单开放易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

    发布订阅

    image.png

    文档总结

    🟣 MQTT 发布订阅

    🟣 MQTT 协议

    🟣 Java实现MQTT通信

    🟣 基于 WebSocket 的 MQTT

    服务端

    服务端使用 mosquitto(版本2.0.14)

    下载页面:https://mosquitto.org/download/

    客户端

    MQTTX

    下载页面:https://mqttx.app/zh#download

    MQTT.fx

    下载链接:http://www.jensd.de/apps/mqttfx/1.7.1/mqttfx-1.7.1-windows-x64.exe
    MQTTfx官网:http://mqttfx.org
    太极创客下载地址:http://www.taichi-maker.com/homepage/download/#mqtt
    备份的蓝奏云下载链接:https://ioufev.lanzout.com/irlNC064nc4f

    paho

    https://github.com/eclipse/paho.mqtt.java

    paho是eclipse提供MQTT客户端开源库,Java代码集成这个客户端用来收发消息。

    springboot 集成 MQTT

    代码:https://gitee.com/ioufev/mqtt-springboot-demo

    依赖

    pom.xml

    <!-- MQTT -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    
    

    类MqttConfig

    spring中集成框架,有消息入站通道(用来接收消息)和出站通道(用来发送消息)

    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    @Configuration
    public class MqttConfig {
    
        // 消费消息
    
        /**
         * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
         * @return factory
         */
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            MqttConnectOptions options = new MqttConnectOptions();
    
            // 设置代理端的URL地址,可以是多个
            options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
    
            factory.setConnectionOptions(options);
            return factory;
        }
    
        /**
         * 入站通道
         */
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        /**
         * 入站
         */
        @Bean
        public MessageProducer inbound() {
            // Paho客户端消息驱动通道适配器,主要用来订阅主题
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
                    mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
            adapter.setCompletionTimeout(5000);
    
            // Paho消息转换器
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
            // 按字节接收消息
    //        defaultPahoMessageConverter.setPayloadAsBytes(true);
            adapter.setConverter(defaultPahoMessageConverter);
            adapter.setQos(1); // 设置QoS
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
    
        @Bean
        // ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return message -> {
                String payload = message.getPayload().toString();
    
                // byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
    
                // 根据主题分别进行消息处理。
                if (topic.matches(".+/sensor")) { // 匹配:1/sensor
                    String sensorSn = topic.split("/")[0];
                    System.out.println("传感器" + sensorSn + ": 的消息: " + payload);
                } else if (topic.equals("collector")) {
                    System.out.println("采集器的消息:" + payload);
                } else {
                    System.out.println("丢弃消息:主题[" + topic  + "],负载:" + payload);
                }
    
            };
        }
    
        // 发送消息
    
        /**
         * 出站通道
         */
        @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
    
        /**
         * 出站
         */
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler outbound() {
    
            // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
            messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
            messageHandler.setDefaultTopic("command");
            messageHandler.setDefaultQos(1); // 设置默认QoS
    
            // Paho消息转换器
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
    
            // defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
            messageHandler.setConverter(defaultPahoMessageConverter);
            return messageHandler;
        }
    
    }
    
    

    接口MqttGateway

    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        // 定义重载方法,用于消息发送
        void sendToMqtt(String payload);
        // 指定topic进行消息发送
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
    }
    
    

    测试

    测试方式:使用接口工具,给接口发送消息,从而调用MQTT客户端发布消息

    类MqttController

    import com.ioufev.mqtt.domain.MyMessage;
    import com.ioufev.mqtt.mqtt.MqttGateway;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    @RestController
    public class MqttController {
    
        @Resource
        private MqttGateway mqttGateway;
    
        @PostMapping("/send")
        public String send(@RequestBody MyMessage myMessage) {
            // 发送消息到指定主题
            mqttGateway.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
            return "send topic: " + myMessage.getTopic()+ ", message : " + myMessage.getContent();
        }
    
    }
    
    

    类MyMessage

    public class MyMessage {
        private String topic;
        private String content;
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    }
    

    转载来源:https://www.cnblogs.com/ioufev/p/15293367.html

    相关文章

      网友评论

          本文标题:MQTT的介绍及Java实现MQTT通讯

          本文链接:https://www.haomeiwen.com/subject/eywotdtx.html