美文网首页
2020-05-04 结合Netty和moquette模拟MQT

2020-05-04 结合Netty和moquette模拟MQT

作者: 中托马斯回旋踢 | 来源:发表于2020-05-04 11:46 被阅读0次

    自己写的MQTT服务器【Qos暂时支持0、1】

    上代码:
    1.myMQTTServer

    package com.MyMQTTServerAndClient.server;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    /*
     * Description TODO
     * On 4/27/2020 3:44 PM
     */
    public class myMQTTServer {
        public static void main(String[] args) {
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();//启动类
                serverBootstrap.group(boss, worker);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.childOption(ChannelOption.SO_BACKLOG,124);
                serverBootstrap.childHandler(new ServerChannelInitializer());
                ChannelFuture future = serverBootstrap.bind(7777).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    

    2.ServerChannelInitializer

    package com.MyMQTTServerAndClient.server;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import org.eclipse.moquette.parser.netty.MQTTDecoder;
    import org.eclipse.moquette.parser.netty.MQTTEncoder;
    /*
     * Description TODO
     * On 4/28/2020 5:19 PM
     */
    public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast("decoder", new MQTTDecoder());//解码
            pipeline.addLast("encoder", new MQTTEncoder());//编码
            //多个客户端下使用单例模式,保证是同一个handler
         pipeline.addLast("myHandler",MQTTServerHandler.getSingleton());
        }
    }
    

    3.MQTTServerHandler

    package com.MyMQTTServerAndClient.server;
    import io.netty.channel.*;
    import io.netty.util.concurrent.GenericFutureListener;
    import org.eclipse.moquette.proto.messages.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    /**
     *  MQTTServerHandler 使用单例模式
     */
    //多个客户端和一个服务器连接必须是Sharable
    @ChannelHandler.Sharable
    public class MQTTServerHandler extends SimpleChannelInboundHandler<AbstractMessage> {
    
        private static Logger logger = LoggerFactory.getLogger(MQTTServerHandler.class);
    
        private String userName="admin";
    
        private String passWord="password";
        //存放客户端发布的话题
        public static BlockingQueue<PublishMessage> needSendToSubClient;
        //为了获取needSendToSubClient中的PublishMessage 【 key为messageId value为该PublishMessage】
        ConcurrentHashMap<Integer,PublishMessage> needSendMap=new ConcurrentHashMap<>();//定时清理
        //存放已经发送成功发送给订阅的客户端消息的map
        ConcurrentHashMap<Integer,PublishMessage> alreadySendMap=new ConcurrentHashMap<>();//定时清理
        //存放已经发送成功发送给订阅的客户端消息 queue
        public static BlockingQueue<PublishMessage> alreadySendToSubClient;
        //
        ExecutorService executorService;
        //定时任务执行线程池
        ScheduledExecutorService scheduledExecutorService;
        //客户端订阅的消息
        Map<Channel,List<SubscribeMessage>> subscribeClientMap;
    
        AtomicBoolean running=new AtomicBoolean(true);
    
        Map<Channel,Long> clientKeepAliveTime=new HashMap<>();
    
        //客户端存活判断队列
        DelayQueue<Delayed> clientKeepAliveQueue=new DelayQueue<>();
    
        //取消任务用[channel对应的最近的客户端是否超时没有数据任务]
        Map<Channel,Future> canalTask=new HashMap<>();
    
        private static MQTTServerHandler mQTTServerHandler;
        //多个客户端下使用单例
        public static MQTTServerHandler getSingleton(){
           if(mQTTServerHandler==null){
                synchronized (MQTTServerHandler.class){
                    mQTTServerHandler=new MQTTServerHandler();
                }
           }
           return mQTTServerHandler;
        }
        //单例模式下只会初始化一次
        private MQTTServerHandler() {
            logger.info("===========start init MQTTServerHandler============");
            this.executorService =  Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            this.scheduledExecutorService=Executors.newScheduledThreadPool(4);
            this.subscribeClientMap=new ConcurrentHashMap<>();
            this.needSendToSubClient=new LinkedBlockingQueue<>();
            this.alreadySendToSubClient=new LinkedBlockingQueue<>();
            executorService.submit(()->{
                while (running.get()) {
                    try {
                        ClientKeepAliveTask task = (ClientKeepAliveTask) clientKeepAliveQueue.poll(5, TimeUnit.SECONDS);
                        if (task != null) {
                            synchronized (task){
                                if(task.getCanal()==false){
                                    //在给定keepAlive时间内没有收到数据或者心跳,关闭连接
                                    task.getChannel().close();
                                    task.setDone(true);
                                    logger.error(System.currentTimeMillis()+" channel "+task.channel.remoteAddress()+" close");
                                }
                            }
                        }
                    } catch (Exception e) {
                        logger.error("error message {}",e);
                    }
                }
            });
            scheduledExecutorService.scheduleAtFixedRate(()-> {
                if (running.get()) {
                    logger.info("heart beat queue size {} ", clientKeepAliveQueue.size());
                }
            },0,30,TimeUnit.SECONDS);
            //打印客户端发布的主题消息
            scheduledExecutorService.scheduleAtFixedRate(()-> {
                if (running.get()) {
                    if(!needSendToSubClient.isEmpty()){
                        logger.info("needSendToSubClient queue size {}",needSendToSubClient.size());
                        needSendToSubClient.forEach((p)->{
                            logger.info(p.getTopicName()+" payload [ " +new String(p.getPayload().array())+ " ]");
                        });
                    }
                }
            },2,4,TimeUnit.SECONDS);
            //发送给感兴趣的客户端
            executorService.submit(()->{
                while (running.get()) {
                    while(!needSendToSubClient.isEmpty()){
                        PublishMessage publishMessage = null;
                        try {
                            publishMessage = needSendToSubClient.poll(100, TimeUnit.MILLISECONDS);
                            if(publishMessage!=null){
                                //发送给感兴趣的客户端
                                Set<Channel> channelSet = subscribeClientMap.keySet();
                                for(Channel channel:channelSet){
                                    List<SubscribeMessage> subscribeMessageList = subscribeClientMap.get(channel);
                                    for(SubscribeMessage sub:subscribeMessageList){
                                        if(sub.subscriptions().get(0).getTopicFilter().equals(publishMessage.getTopicName())){
                                            PublishMessage finalPublishMessage = publishMessage;
                                            channel.writeAndFlush(publishMessage,channel.newPromise().addListener(future -> {
                                                logger.info("PublishMessage send to client {} success topic {}",channel.remoteAddress(),finalPublishMessage.getTopicName());
                                                alreadySendMap.put(finalPublishMessage.getMessageID(),finalPublishMessage);
                                            }));
                                        }
                                    }
                                }
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //定时清理缓存
            scheduledExecutorService.scheduleAtFixedRate(()-> {
                if (running.get()) {
                    if(!alreadySendMap.isEmpty()){
                        alreadySendMap.clear();
                        logger.info("alreadySendMap map clear");
                    }
                    if(!needSendMap.isEmpty()){
                        needSendMap.clear();
                        logger.info("needSendMap map clear");
                    }
                }
            },60,60,TimeUnit.SECONDS);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.error("error message "+cause.getMessage());
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            logger.info("client "+ctx.channel().remoteAddress()+" Registered");
        }
    
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            logger.error("channelInactive " +ctx.channel().remoteAddress());
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, AbstractMessage msg) throws Exception {
            if(msg instanceof AbstractMessage){
                byte messageType = msg.getMessageType();
                if(messageType!=AbstractMessage.CONNECT){
                    //取消原来的任务,重新提交客户端心跳保持任务
                    resubmitKeepAliveTask(ctx);
                }
                switch(messageType){
                    case AbstractMessage.CONNECT:
                        processConnectRequest(ctx,msg);
                        break;
                    case AbstractMessage.PUBLISH:
                        processPublishRequest(ctx,msg);
                        break;
                    case AbstractMessage.PUBREL:
                        break;
                    case AbstractMessage.PINGREQ:
                        processPingRequest(ctx,msg);
                        break;
                    case AbstractMessage.SUBSCRIBE:
                        processSubscribeRequest(ctx,msg);
                        break;
                    case AbstractMessage.UNSUBSCRIBE:
                        processUnsubscribeRequest(ctx,msg);
                        break;
                    default:
                        logger.info("message type "+((AbstractMessage) msg).getMessageType());
                        break;
                }
            }
        }
        //为完成
        private void processUnsubscribeRequest(ChannelHandlerContext ctx, Object msg) {
        }
    
        /**
         * 客户端向服务端订阅
         * @param ctx
         * @param msg
         */
        private void processSubscribeRequest(ChannelHandlerContext ctx, Object msg) {
            SubscribeMessage subscribeMessage = (SubscribeMessage) msg;
            Integer messageID = subscribeMessage.getMessageID();
            //该客户端曾经订阅过话题
            if(subscribeClientMap.containsKey(ctx.channel())){
                SubAckMessage subAckMessage = new SubAckMessage();
                subAckMessage.setMessageID(messageID);
                subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
                if(subscribeMessage.getMessageID()!=null){
                    ctx.channel().writeAndFlush(subAckMessage,ctx.newPromise().addListener(future -> {
                        List<SubscribeMessage> subscribeMessagesList = subscribeClientMap.get(ctx.channel());
                        subscribeMessagesList.add(subscribeMessage);
                        subscribeClientMap.put(ctx.channel(),subscribeMessagesList);
                        logger.info("server success send subAck to client {}, topic {} id {}",ctx.channel().remoteAddress(),subscribeMessage.subscriptions().get(0).getTopicFilter(),subscribeMessage.getMessageID());
                    }));
                }
                //Qos=0,不用回复ack
                else{
                    List<SubscribeMessage> subscribeMessagesList = subscribeClientMap.get(ctx.channel());
                    subscribeMessagesList.add(subscribeMessage);
                    subscribeClientMap.put(ctx.channel(),subscribeMessagesList);
                }
            }
            //该客户端未曾订阅过话题
            else{
                SubAckMessage subAckMessage = new SubAckMessage();
                subAckMessage.setMessageID(messageID);
                subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
                if(subscribeMessage.getMessageID()!=null){
                    ctx.channel().writeAndFlush(subAckMessage, ctx.newPromise().addListener((future)->{
                        List<SubscribeMessage> subscribeMessageList = new ArrayList<>();
                        subscribeMessageList.add(subscribeMessage);
                        subscribeClientMap.put(ctx.channel(), subscribeMessageList);
                        logger.info("server success send subAck to client {}, id {}", ctx.channel().remoteAddress(), subAckMessage.getMessageID());
                    }));
                }
                //Qos=0,不用回复ack
                else{
                    List<SubscribeMessage> subscribeMessageList = new ArrayList<>();
                    subscribeMessageList.add(subscribeMessage);
                    subscribeClientMap.put(ctx.channel(),subscribeMessageList);
                }
            }
        }
    
        private void processPingRequest(ChannelHandlerContext ctx, Object msg) {
            logger.info("get heart beat from "+ctx.channel().remoteAddress());
            PingRespMessage pingRespMessage = new PingRespMessage();
            pingRespMessage.setQos(AbstractMessage.QOSType.MOST_ONE);//暂时只支持0
            ctx.channel().writeAndFlush(pingRespMessage);
        }
    
        private void processPublishRequest(ChannelHandlerContext ctx, Object msg) {
            PublishMessage publishMessage = (PublishMessage) msg;
            Integer messageID = publishMessage.getMessageID();
            //服务端已经保存了该客户端发布的话题消息
            //qos=1
            if(publishMessage.getQos()== AbstractMessage.QOSType.LEAST_ONE){
                if(needSendMap.containsKey(messageID)||alreadySendMap.contains(messageID)){
                    logger.info("server already add this PublishMessage or push to subscribe client {} {}",publishMessage.getTopicName(),publishMessage.getMessageID());
                }else{
                    boolean offer = needSendToSubClient.offer(publishMessage);
                    if(offer){
                        needSendMap.put(messageID,publishMessage);
                        logger.info("server add PublishMessage into queue success {}",publishMessage.getTopicName());
                        PubAckMessage pubAckMessage = new PubAckMessage();
                        pubAckMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
                        pubAckMessage.setMessageID(publishMessage.getMessageID());
                        ctx.channel().writeAndFlush(pubAckMessage);
                    }
                }
            }
            //qos=0
            else if (publishMessage.getQos()== AbstractMessage.QOSType.MOST_ONE){
                boolean offer = needSendToSubClient.offer(publishMessage);
                if(offer){
                    logger.info("server add PublishMessage into queue success {}",publishMessage.getTopicName());
                }
            }
        }
    
        private void processConnectRequest(ChannelHandlerContext ctx, Object msg) {
            ConnectMessage msg1 = (ConnectMessage) msg;
            int keepAlive = msg1.getKeepAlive();//在没有接收到数据情况下客户端存活时间
            submitKeepAliveTask(ctx, Long.valueOf(keepAlive));
            if(msg1.getUsername().equals(this.userName)&&msg1.getPassword().equals(this.passWord)){
                ConnAckMessage connAckMessage = new ConnAckMessage();
                connAckMessage.setReturnCode((byte)0);
                ctx.channel().writeAndFlush(connAckMessage);
                logger.info("send connectAck");
            }else{
                ConnAckMessage connAckMessage = new ConnAckMessage();
                connAckMessage.setReturnCode((byte)4);
                ctx.channel().writeAndFlush(connAckMessage);
                logger.error("channel {} username or password error ",ctx.channel().remoteAddress());
                ctx.close();
            }
        }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
           ctx.fireUserEventTriggered(evt);
        }
    
        public void resubmitKeepAliveTask(ChannelHandlerContext ctx){
            Future future = canalTask.get(ctx.channel());
            Long keepAlive = clientKeepAliveTime.get(ctx.channel());
            boolean cancel = future.cancel(true);
            if(cancel==true){
                logger.info("keep alive task resubmit success");
            }else{
                logger.error("channel already closed "+ctx.channel());
            }
            submitKeepAliveTask(ctx,keepAlive);
        }
        //提交心跳超时判断任务
        public void submitKeepAliveTask(ChannelHandlerContext ctx,Long keepAlive){
            Future clientKeepAliveTaskFuture = this.getClientKeepAliveTaskFuture(ctx, Long.valueOf(keepAlive * 1000));
            canalTask.put(ctx.channel(),clientKeepAliveTaskFuture);
            clientKeepAliveTime.put(ctx.channel(),keepAlive);
        }
        //取消任务用
        public Future getClientKeepAliveTaskFuture(ChannelHandlerContext ctx,Long keepAlive){
            ClientKeepAliveTask clientKeepAliveTask = new ClientKeepAliveTask(ctx.channel(), keepAlive, System.currentTimeMillis());
            clientKeepAliveQueue.offer(clientKeepAliveTask);
            return new Future() {
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    synchronized (clientKeepAliveTask){
                        if(clientKeepAliveTask.getDone()==false){
                            clientKeepAliveTask.setCanal(mayInterruptIfRunning);
                            return true;
                        }
                        return false;
                    }
                }
                @Override
                public boolean isCancelled() {
                    return clientKeepAliveTask.getCanal();
                }
                @Override
                public boolean isDone() {
                    return clientKeepAliveTask.getDone();
                }
                @Override
                public Object get() throws InterruptedException, ExecutionException {
                    return clientKeepAliveTask;
                }
                @Override
                public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                    return null;
                }
            };
        };
        /**
         * 判断客户端的保持存活
         */
        private static class ClientKeepAliveTask implements Delayed{
            private Channel channel;
            /**
             * true表示在规定时间内收到数据,客户端保持存活,不断开连接
             */
            private Boolean canal=false;
            /**
             * 延迟毫秒数
             */
            private long delayMillis;
            /**
             * 开始时间
             */
            private long startTime;
            /**
             * 任务是否完成
             */
            private volatile Boolean Done=false;
    
            public ClientKeepAliveTask(Channel channel, Boolean canal, long delayMillis, long startTime) {
                this.channel = channel;
                this.canal = canal;
                this.delayMillis = delayMillis;
                this.startTime = startTime;
            }
            public ClientKeepAliveTask(Channel channel, long delayMillis, long startTime) {
                this.channel = channel;
                this.delayMillis = delayMillis;
                this.startTime = startTime;
            }
            @Override
            public long getDelay(TimeUnit unit) {
                return (startTime+delayMillis)-System.currentTimeMillis();
            }
            @Override
            public int compareTo(Delayed o) {
                ClientKeepAliveTask o1 = (ClientKeepAliveTask) o;
                return (int) ((delayMillis+startTime)-(o1.delayMillis+o1.startTime));
            }
            public Channel getChannel() {
                return channel;
            }
            public void setChannel(Channel channel) {
                this.channel = channel;
            }
            public long getDelayMillis() {
                return delayMillis;
            }
            public void setDelayMillis(long delayMillis) {
                this.delayMillis = delayMillis;
            }
            public long getStartTime() {
                return startTime;
            }
            public void setStartTime(long startTime) {
                this.startTime = startTime;
            }
            public Boolean getCanal() {
                return canal;
            }
            public void setCanal(Boolean canal) {
                this.canal = canal;
            }
            public Boolean getDone() {
                return Done;
            }
            public void setDone(Boolean done) {
                Done = done;
            }
        }
    }
    
    

    下一篇:订阅客户端
    https://www.jianshu.com/p/362d559d8ae9

    相关文章

      网友评论

          本文标题:2020-05-04 结合Netty和moquette模拟MQT

          本文链接:https://www.haomeiwen.com/subject/jypeghtx.html