美文网首页
2020-05-04 结合Netty和moquette模拟MQT

2020-05-04 结合Netty和moquette模拟MQT

作者: 中托马斯回旋踢 | 来源:发表于2020-05-04 11:59 被阅读0次

    模拟MQTT发布客户端

    上代码:
    1.pubClient

    package com.MyMQTTServerAndClient.publishClient;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LoggingHandler;
    import org.eclipse.moquette.parser.netty.MQTTDecoder;
    import org.eclipse.moquette.parser.netty.MQTTEncoder;
    public class pubClient {
        public static void main(String[] args) {
            pubClient.startMqttClient();
        }
        public static void startMqttClient(){
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group);
                b.channel(NioSocketChannel.class);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast("log", new LoggingHandler());
                        pipeline.addLast("decoder", new MQTTDecoder());//解码
                        pipeline.addLast("encoder", new MQTTEncoder());//编码
                        pipeline.addLast("handler", new PubMQTTClientHandler());
                    }
                });
                ChannelFuture ch = b.connect("127.0.0.1", 7777).sync();
                ch.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
                group.shutdownGracefully();
            }
        }
    }
    

    2.PubMQTTClientHandler

    package com.MyMQTTServerAndClient.publishClient;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    import io.netty.util.concurrent.ScheduledFuture;
    import org.eclipse.moquette.proto.messages.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.stream.IntStream;
    import static java.util.concurrent.TimeUnit.MILLISECONDS;
    import static java.util.concurrent.TimeUnit.SECONDS;
    /**
     * 暂时只支持Qos【0,1】
     * 备注:qos=0情况下服务端接收到数据后MessageID会被设置为null【应该是编解码的时候被设置】
     * 该类负责发布话题
     */
    public class PubMQTTClientHandler extends SimpleChannelInboundHandler<AbstractMessage> {
    
        private static Logger logger = LoggerFactory.getLogger(PubMQTTClientHandler.class);
    
        private String userName="admin";
    
        private String passWord="password";
    
        ScheduledFuture<?> scheduledFuture;//取消客户端心跳用
    
        private static AtomicBoolean firstPingResp=new AtomicBoolean(true);//第一次ping通
        //Qos=0的待发布的话题
        LinkedBlockingDeque<PublishMessage> zeroQosPubQueue=new LinkedBlockingDeque<>();
        //Qos=1的待发布的话题
        LinkedBlockingDeque<PublishMessage> oneQosPubQueue=new LinkedBlockingDeque<>();
        //Qos=1的发布话题的ack
        LinkedBlockingDeque<PubAckMessage> oneQosPubAckQueue=new LinkedBlockingDeque<>();
        //已经发送的topic key为messageId value为该PublishMessage
        ConcurrentHashMap<Integer,PublishMessage> alreadySendMap=new ConcurrentHashMap<>();
    
        ExecutorService executorService;
    
        CountDownLatch countDownLatch=new CountDownLatch(1);
    
        List<java.util.concurrent.Future> FutureList=new ArrayList<>();
    
        private Channel channel;
    
        private Boolean running=new Boolean(true);
    
        public PubMQTTClientHandler() {
            executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            //发布Qos=0的话题
            java.util.concurrent.Future<?> future1 = executorService.submit(() -> {
                while(running){
                    while (!zeroQosPubQueue.isEmpty()) {
                        PublishMessage publishMessage = zeroQosPubQueue.removeFirst();
                        channel.writeAndFlush(publishMessage);
                        logger.info("publishMessage {} publish success {}", publishMessage.getTopicName(),publishMessage.getMessageID());
                    }
                    try {
                        //防止执行过快
                        countDownLatch.await(1000, MILLISECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            //发布Qos=1的话题
            java.util.concurrent.Future<?> future2 = executorService.submit(() -> {
                while(running){
                    while (!oneQosPubQueue.isEmpty()) {
                        PublishMessage publishMessage = oneQosPubQueue.removeFirst();
                        Integer messageID = publishMessage.getMessageID();
                        //还没发送过该topic消息,需要加入map
                        if (!alreadySendMap.containsKey(messageID)) {
                            alreadySendMap.put(messageID, publishMessage);
                        }
                        channel.writeAndFlush(publishMessage);
                        logger.info("publishMessage {} publish success {}", publishMessage.getTopicName(),publishMessage.getMessageID());
                        oneQosPubQueue.addLast(publishMessage);
                        try {
                            //防止发送过快
                            SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //接受Qos=1的话题的ack
            java.util.concurrent.Future<?> future3 = executorService.submit(() -> {
                while(running){
                    while (!oneQosPubAckQueue.isEmpty()) {
                        PubAckMessage pubAckMessage = oneQosPubAckQueue.removeFirst();
                        if (alreadySendMap.containsKey(pubAckMessage.getMessageID())) {
                            //topic收到ack,需要从发送队列移除该topic
                            oneQosPubQueue.remove(alreadySendMap.get(pubAckMessage.getMessageID()));
                            alreadySendMap.remove(pubAckMessage.getMessageID());
                        } else {
                            logger.error("receive error PubAckMessage {}",
                                    pubAckMessage.getMessageID());
                        }
                    }
                    try {
                        //防止运行过快
                        SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            FutureList.add(future1);
            FutureList.add(future2);
            FutureList.add(future3);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.error("error message {}",cause);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("client "+ctx.channel().remoteAddress()+" active");
            //连接
            ConnectMessage connectMessage = new ConnectMessage();
            connectMessage.setProcotolVersion((byte) 3);
            connectMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
            connectMessage.setCleanSession(true);
            connectMessage.setClientID(String.valueOf(UUID.randomUUID()));
            connectMessage.setKeepAlive(60);
            connectMessage.setWillFlag(false);
            connectMessage.setUserFlag(true);
            connectMessage.setUsername(this.userName);
            connectMessage.setPasswordFlag(true);
            connectMessage.setPassword(this.passWord);
            ctx.writeAndFlush(connectMessage,ctx.newPromise().addListener((future)->{
                if(future.cause()==null){
                   logger.info("send connectMessage success");
                }else{
                   logger.error("send connectMessage error "+future.cause());
                }
            }));
    
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            logger.error("channelInactive " +ctx.channel().remoteAddress());
            boolean cancel = scheduledFuture.cancel(true);
            logger.info("{} client canal heart beat {}",ctx.channel().localAddress(),cancel);
            ctx.close(ctx.newPromise().addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    logger.info("channel {} close ",ctx.channel().localAddress());
                }
            }));
            if(this.channel!=null){
                channel=null;
            }
            //
            running=false;
            //
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, AbstractMessage msg) throws Exception {
    
            if(msg instanceof AbstractMessage){
                byte messageType = msg.getMessageType();
                switch(messageType){
                    case AbstractMessage.CONNACK:
                        processConnectAckResp(ctx,msg);
                        break;
                    case AbstractMessage.PINGRESP:
                        processPingResp();
                        break;
                    case AbstractMessage.PUBACK:
                        processPubAck(ctx,msg);
                        break;
                    default:
                        logger.info("message type "+((AbstractMessage) msg).getMessageType());
                        break;
                }
            }
        }
    
        /**
         * 发布的topic Qos=1时会收到pubAck
         * @param ctx
         * @param msg
         */
        private void processPubAck(ChannelHandlerContext ctx, AbstractMessage msg) {
            PubAckMessage pubAckMessage = (PubAckMessage) msg;
            boolean result = oneQosPubAckQueue.offerLast(pubAckMessage);
            if(result==true){
                logger.info("receive PubAckMessage {} from MQTTServer,and insert queue success",pubAckMessage.getMessageID());
            }else{
                logger.error("failed insert PubAckMessage {} into queue",pubAckMessage.getMessageID());
            }
        }
    
        //第一次接收到服务端pingResp后才会发布主题
        private void processPingResp() {
            if(firstPingResp.get()){
    //            IntStream.rangeClosed(5,200).forEach((i)->{
    ////                publishTopic("mqttClient/topicOne"+i,AbstractMessage.QOSType.LEAST_ONE);
    ////            });
                publishTopic("mqttClient/topicOne1",AbstractMessage.QOSType.LEAST_ONE);
                publishTopic("mqttClient/topicOne2",AbstractMessage.QOSType.LEAST_ONE);
                publishTopic("mqttClient/topicTwo2",AbstractMessage.QOSType.LEAST_ONE);
                publishTopic("mqttClient/topicTwo3",AbstractMessage.QOSType.MOST_ONE);
                for(int i=0;i<5;i++){
                    publishTopic("mqttClient/topicTwo4",AbstractMessage.QOSType.MOST_ONE);
                }
                logger.debug("first receive ping response , start publish topic" );
                firstPingResp.set(false);
            }else{
                logger.debug("ping response");
            }
        }
    
        /**
         * 发布主题【Qos目前只支持0、1】
         * @param topic
         * @param qos
         */
        private void publishTopic(String topic,AbstractMessage.QOSType qos) {
            //这个字段需要设置,否者无法发送消息
            PublishMessage publishMessage = new PublishMessage();
            publishMessage.setTopicName(topic);
            publishMessage.setQos(qos);
            int random = new Random().nextInt(10000);//<2^32-1
            /**
             *   qos=0情况下服务端接收到数据后MessageID会被设置为null【应该是编解码的时候被设置】
             */
            publishMessage.setMessageID(random);
            publishMessage.setRetainFlag(false);
            publishMessage.setPayload(ByteBuffer.wrap("I am payload".getBytes()));
            if(qos== AbstractMessage.QOSType.MOST_ONE){
                boolean offer = zeroQosPubQueue.offer(publishMessage);
                if(offer){
                    logger.info("publishMessage {} insert zeroQosPubQueue success ",publishMessage.getTopicName());
                }
            }else if(qos== AbstractMessage.QOSType.LEAST_ONE){
                boolean offer = oneQosPubQueue.offer(publishMessage);
                if(offer){
                    logger.info("publishMessage {} insert oneQosPubQueue success ",publishMessage.getTopicName());
                }
            }
        }
    
        private void processConnectAckResp(ChannelHandlerContext ctx, Object msg) {
            ConnAckMessage ackMessage = (ConnAckMessage) msg;
            if(ackMessage.getReturnCode()==(byte)0){
                channel=ctx.channel();
                logger.info("get connectAck success, start heart beat" );
                //存
    //            ctx.channel().attr(MyConstants.pingKey).setIfAbsent("firstPing");
                // 定时心跳任务
                scheduledFuture = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
                    PingReqMessage pingReqMessage = new PingReqMessage();
                    pingReqMessage.setQos(AbstractMessage.QOSType.MOST_ONE);//心跳暂时MOST_ONE(0)
                    ctx.writeAndFlush(pingReqMessage, ctx.newPromise().addListener((future) -> {
                        if (future.cause() == null) {
                            logger.info("send heartBeat success");
                        } else {
                            logger.error("send heartBeat error");
                        }
                    }));
                }, 2, 55, SECONDS);
            }else if(ackMessage.getReturnCode()==(byte)4){
                logger.error("get connectAck failed, cannot heart beat,username or password" );//
                ctx.close();
            }
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
           ctx.fireUserEventTriggered(evt);
        }
    }
    
    

    下一篇:测试
    https://www.jianshu.com/p/3f2d9ccfe310

    相关文章

      网友评论

          本文标题:2020-05-04 结合Netty和moquette模拟MQT

          本文链接:https://www.haomeiwen.com/subject/zgmeghtx.html