美文网首页
实现一个Mqtt网关(一)启用NettyServer.md

实现一个Mqtt网关(一)启用NettyServer.md

作者: ShootHzj | 来源:发表于2020-12-18 18:34 被阅读0次

物联网是现在比较热门的软件领域,众多云厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。

使用Netty搭建高性能服务器是一个常见的选择,Netty自带Mqtt的编解码,我们很快就可以在Netty服务器中插入Mqtt的编解码handler,由netty已经编写好的模块帮助我们做mqtt的编解码,我们仅需自己处理mqtt协议业务的处理,如登录,订阅分发等。

NettyServer使用MqttHandler编解码

package com.github.shoothzj.demo.iot.mqtt.broker;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author hezhangjian
 */
@Slf4j
public class MqttServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new MqttDecoder());
                            p.addLast(MqttEncoder.INSTANCE);
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(1883).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

客户端采用eclipse mqtt client

package com.github.shoothzj.demo.mqtt.client;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @author hezhangjian
 */
@Slf4j
public class MqttClientEx {

    public static void main(String[] args) throws Exception {
        String topic = "MQTT Examples";
        String content = "Message from MqttPublishSample";
        int qos = 2;
        String broker = "tcp://127.0.0.1:1883";
        String clientId = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: " + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: " + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

}

然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了

image.png

这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应

image.png

但是根据mqtt标准协议,发送MqttConnect,必须有CONNAck

image.png

所以我们要在mqttConn后,业务上返回ConnAck消息,下一节我们在这个基础上自己实现Handler返回Connack消息

参考

相关文章

网友评论

      本文标题:实现一个Mqtt网关(一)启用NettyServer.md

      本文链接:https://www.haomeiwen.com/subject/mfjfnktx.html