美文网首页
springboot整合RabbitMQ Mqtt

springboot整合RabbitMQ Mqtt

作者: 一叶知秋_038b | 来源:发表于2019-10-25 12:18 被阅读0次

    第一,pom配置,我们需要引入相关jar:

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    

    第二,配置MQTT服务器基本信息,在springBoot配置文件application.properties中配置,添加如下:

    #XML格式配置
    #MQTT配置信息
    #MQTT-用户名
    spring.mqtt.username=admin
    #MQTT-密码
    spring.mqtt.password=password
    #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
    spring.mqtt.url=tcp://127.0.0.1:61613
    #MQTT-连接服务器默认客户端ID
    spring.mqtt.client.id=mqttId
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
    spring.mqtt.default.topic=topic
    
    #YML格式配置
    mqtt:
        username: wisx
        password: wisx
        url: tcp://localhost:1883
        client:
          id: mqttId555
        default:
          topic: topic22
        completionTimeout: 10000
    

    第三,配置MQTT消息推送配置类,

    @Configuration
    @IntegrationComponentScan
    public class MqttSenderConfig {
     
        @Value("${spring.mqtt.username}")
        private String username;
     
        @Value("${spring.mqtt.password}")
        private String password;
     
        @Value("${spring.mqtt.url}")
        private String hostUrl;
     
        @Value("${spring.mqtt.client.id}")
        private String clientId;
     
        @Value("${spring.mqtt.default.topic}")
        private String defaultTopic;
     
        @Bean
        public MqttConnectOptions getMqttConnectOptions(){
            MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
            mqttConnectOptions.setUserName(username);
            mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(new String[]{hostUrl});
            mqttConnectOptions.setKeepAliveInterval(2);
            return mqttConnectOptions;
        }
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttConnectOptions());
            return factory;
        }
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(defaultTopic);
            return messageHandler;
        }
        @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
    }
    

    第四,配置MqttGateway消息推送接口类,在sendToMqtt(String data,@Header(MqttHeaders.TOPIC)String topic)接口中,data为发送的消息内容,topic为主题。指定topic,则我们的接口可以根据需要,向不同的主题发送消息,方便灵活应用。如果不指定,则使用默认配置的主题。

    @Component
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
    }
    

    第五,接口类测试下功能,用Postman调用sendMqtt.do接口,往hello主题发送消息,用MQTTLens订阅hello主题,从下面截图,可以看出可以正常往MQTT服务发送消息了,而且可以订阅到。

    @RestController
    @RequestMapping("/test")
    public class TestController {
     
        @Autowired
        private MqttGateway mqttGateway;
     
        @RequestMapping("/sendMqtt.do")
        public String sendMqtt(String  sendData){
            mqttGateway.sendToMqtt(sendData,"hello");
            return "OK";
        }
    }
    
    image.png
    image.png
    参考转载自:https://blog.csdn.net/qq_41018959/article/details/80592444
    第六,接收配置类
    @Configuration
    @IntegrationComponentScan
    @Slf4j
    public class MqttReceiveConfig {
    
        @Value("${spring.mqtt.username}")
        private String username;
    
        @Value("${spring.mqtt.password}")
        private String password;
    
        @Value("${spring.mqtt.url}")
        private String hostUrl;
    
        @Value("${spring.mqtt.client.id}")
        private String clientId;
    
        @Value("${spring.mqtt.default.topic}")
        private String defaultTopic;
    
        @Value("${spring.mqtt.completionTimeout}")
        private int completionTimeout ;   //连接超时
    
        @Autowired
        AlarmService alarmService;
    
        @Autowired
        private MongoTemplate mongoTemplate;
    
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
    
        @Bean
        public MqttConnectOptions getMqttConnectOptions(){
            // MQTT的连接设置
            MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
            // 设置连接的用户名
            mqttConnectOptions.setUserName(username);
            // 设置连接的密码
            mqttConnectOptions.setPassword(password.toCharArray());
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
            // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
            // 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
            mqttConnectOptions.setCleanSession(false);
            // 设置发布端地址
            mqttConnectOptions.setServerURIs(new String[]{hostUrl});
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            mqttConnectOptions.setKeepAliveInterval(20);
            return mqttConnectOptions;
        }
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttConnectOptions());
            return factory;
        }
    
        //接收通道
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        //配置client,监听的topic
        @Bean
        public MessageProducer inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                            "elevators/cms/#"//加上这个为了匹配node red的topic
                    );
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
    
        private String getCollectionName(String messageType){
            if("report".equals(messageType)){
                return "alarm";
            }
            if("singleRide".equals(messageType)){
                return "singleride";
            }
            if("statisticsDaily".equals(messageType)
                    ||"statisticsSingleRide".equals(messageType)){
                return "statistics";
            }
            if("statisticsElevator".equals(messageType)){
                return "kpis";
            }
            return "othertype";
        }
    
        //通过通道获取数据
        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:springboot整合RabbitMQ Mqtt

          本文链接:https://www.haomeiwen.com/subject/cfjsvctx.html