美文网首页
SpringBoot-MTQQ

SpringBoot-MTQQ

作者: 猫啸山林 | 来源:发表于2020-03-18 15:20 被阅读0次

参考文章:
https://blog.csdn.net/bugAndMe/article/details/89947373
https://www.jianshu.com/p/b9abcb6eba1f

1、添加项目依赖

implementation 'org.springframework.integration:spring-integration-mqtt:5.1.3.RELEASE'

2、在application.properties中添加mqtt的配置

mqtt.host=tcp://hostip:1883
mqtt.clientinid=subClient_${random.value}
mqtt.clientoutid=${random.value}
mqtt.topic=device_state
mqtt.qoslevel=1
mqtt.username=admin
mqtt.password=pass
mqtt.timeout=10000
mqtt.keepalive=20

3、新建MqttConfig类

@Configuration
@IntegrationComponentScan
public class MqttConfig {

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientinid}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @Value("${mqtt.timeout}")
    private int completionTimeout;   //连接超时

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory(),
                        defaultTopic);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("主题:{" + message.getHeaders().get("mqtt_receivedTopic") + "},消息接收到的数据:{" + message.getPayload() + "}");
            }
        };
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /*****
     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler outbound() {
        // 在这里进行mqttOutboundChannel的相关设置
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publishClient", mqttClientFactory());
        messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Component
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        // 定义重载方法,用于消息发送
        void sendToMqtt(String payload);

        // 指定topic进行消息发送
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    }

}

4、新建MqttController测试消息发送

@RestController
public class MqttController {

    @Autowired
    private MqttConfig.MqttGateway mqttGateway;

    @RequestMapping("/send/{topic}/{message}")
    public String send(@PathVariable String topic, @PathVariable String message) {
        mqttGateway.sendToMqtt(topic, message);
        return "send message : " + message;
    }
}

测试地址:http://localhost:8080/send/topic1/message1

相关文章

  • SpringBoot-MTQQ

    参考文章:https://blog.csdn.net/bugAndMe/article/details/89947...

网友评论

      本文标题:SpringBoot-MTQQ

      本文链接:https://www.haomeiwen.com/subject/bgttyhtx.html