美文网首页
2020-05-04 结合Netty和moquette模拟MQT

2020-05-04 结合Netty和moquette模拟MQT

作者: 中托马斯回旋踢 | 来源:发表于2020-05-04 11:46 被阅读0次

自己写的MQTT服务器【Qos暂时支持0、1】

上代码:
1.myMQTTServer

package com.MyMQTTServerAndClient.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/*
 * Description TODO
 * On 4/27/2020 3:44 PM
 */
public class myMQTTServer {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();//启动类
            serverBootstrap.group(boss, worker);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childOption(ChannelOption.SO_BACKLOG,124);
            serverBootstrap.childHandler(new ServerChannelInitializer());
            ChannelFuture future = serverBootstrap.bind(7777).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

2.ServerChannelInitializer

package com.MyMQTTServerAndClient.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.eclipse.moquette.parser.netty.MQTTDecoder;
import org.eclipse.moquette.parser.netty.MQTTEncoder;
/*
 * Description TODO
 * On 4/28/2020 5:19 PM
 */
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new MQTTDecoder());//解码
        pipeline.addLast("encoder", new MQTTEncoder());//编码
        //多个客户端下使用单例模式,保证是同一个handler
     pipeline.addLast("myHandler",MQTTServerHandler.getSingleton());
    }
}

3.MQTTServerHandler

package com.MyMQTTServerAndClient.server;
import io.netty.channel.*;
import io.netty.util.concurrent.GenericFutureListener;
import org.eclipse.moquette.proto.messages.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
 *  MQTTServerHandler 使用单例模式
 */
//多个客户端和一个服务器连接必须是Sharable
@ChannelHandler.Sharable
public class MQTTServerHandler extends SimpleChannelInboundHandler<AbstractMessage> {

    private static Logger logger = LoggerFactory.getLogger(MQTTServerHandler.class);

    private String userName="admin";

    private String passWord="password";
    //存放客户端发布的话题
    public static BlockingQueue<PublishMessage> needSendToSubClient;
    //为了获取needSendToSubClient中的PublishMessage 【 key为messageId value为该PublishMessage】
    ConcurrentHashMap<Integer,PublishMessage> needSendMap=new ConcurrentHashMap<>();//定时清理
    //存放已经发送成功发送给订阅的客户端消息的map
    ConcurrentHashMap<Integer,PublishMessage> alreadySendMap=new ConcurrentHashMap<>();//定时清理
    //存放已经发送成功发送给订阅的客户端消息 queue
    public static BlockingQueue<PublishMessage> alreadySendToSubClient;
    //
    ExecutorService executorService;
    //定时任务执行线程池
    ScheduledExecutorService scheduledExecutorService;
    //客户端订阅的消息
    Map<Channel,List<SubscribeMessage>> subscribeClientMap;

    AtomicBoolean running=new AtomicBoolean(true);

    Map<Channel,Long> clientKeepAliveTime=new HashMap<>();

    //客户端存活判断队列
    DelayQueue<Delayed> clientKeepAliveQueue=new DelayQueue<>();

    //取消任务用[channel对应的最近的客户端是否超时没有数据任务]
    Map<Channel,Future> canalTask=new HashMap<>();

    private static MQTTServerHandler mQTTServerHandler;
    //多个客户端下使用单例
    public static MQTTServerHandler getSingleton(){
       if(mQTTServerHandler==null){
            synchronized (MQTTServerHandler.class){
                mQTTServerHandler=new MQTTServerHandler();
            }
       }
       return mQTTServerHandler;
    }
    //单例模式下只会初始化一次
    private MQTTServerHandler() {
        logger.info("===========start init MQTTServerHandler============");
        this.executorService =  Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        this.scheduledExecutorService=Executors.newScheduledThreadPool(4);
        this.subscribeClientMap=new ConcurrentHashMap<>();
        this.needSendToSubClient=new LinkedBlockingQueue<>();
        this.alreadySendToSubClient=new LinkedBlockingQueue<>();
        executorService.submit(()->{
            while (running.get()) {
                try {
                    ClientKeepAliveTask task = (ClientKeepAliveTask) clientKeepAliveQueue.poll(5, TimeUnit.SECONDS);
                    if (task != null) {
                        synchronized (task){
                            if(task.getCanal()==false){
                                //在给定keepAlive时间内没有收到数据或者心跳,关闭连接
                                task.getChannel().close();
                                task.setDone(true);
                                logger.error(System.currentTimeMillis()+" channel "+task.channel.remoteAddress()+" close");
                            }
                        }
                    }
                } catch (Exception e) {
                    logger.error("error message {}",e);
                }
            }
        });
        scheduledExecutorService.scheduleAtFixedRate(()-> {
            if (running.get()) {
                logger.info("heart beat queue size {} ", clientKeepAliveQueue.size());
            }
        },0,30,TimeUnit.SECONDS);
        //打印客户端发布的主题消息
        scheduledExecutorService.scheduleAtFixedRate(()-> {
            if (running.get()) {
                if(!needSendToSubClient.isEmpty()){
                    logger.info("needSendToSubClient queue size {}",needSendToSubClient.size());
                    needSendToSubClient.forEach((p)->{
                        logger.info(p.getTopicName()+" payload [ " +new String(p.getPayload().array())+ " ]");
                    });
                }
            }
        },2,4,TimeUnit.SECONDS);
        //发送给感兴趣的客户端
        executorService.submit(()->{
            while (running.get()) {
                while(!needSendToSubClient.isEmpty()){
                    PublishMessage publishMessage = null;
                    try {
                        publishMessage = needSendToSubClient.poll(100, TimeUnit.MILLISECONDS);
                        if(publishMessage!=null){
                            //发送给感兴趣的客户端
                            Set<Channel> channelSet = subscribeClientMap.keySet();
                            for(Channel channel:channelSet){
                                List<SubscribeMessage> subscribeMessageList = subscribeClientMap.get(channel);
                                for(SubscribeMessage sub:subscribeMessageList){
                                    if(sub.subscriptions().get(0).getTopicFilter().equals(publishMessage.getTopicName())){
                                        PublishMessage finalPublishMessage = publishMessage;
                                        channel.writeAndFlush(publishMessage,channel.newPromise().addListener(future -> {
                                            logger.info("PublishMessage send to client {} success topic {}",channel.remoteAddress(),finalPublishMessage.getTopicName());
                                            alreadySendMap.put(finalPublishMessage.getMessageID(),finalPublishMessage);
                                        }));
                                    }
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //定时清理缓存
        scheduledExecutorService.scheduleAtFixedRate(()-> {
            if (running.get()) {
                if(!alreadySendMap.isEmpty()){
                    alreadySendMap.clear();
                    logger.info("alreadySendMap map clear");
                }
                if(!needSendMap.isEmpty()){
                    needSendMap.clear();
                    logger.info("needSendMap map clear");
                }
            }
        },60,60,TimeUnit.SECONDS);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("error message "+cause.getMessage());
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        logger.info("client "+ctx.channel().remoteAddress()+" Registered");
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.error("channelInactive " +ctx.channel().remoteAddress());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, AbstractMessage msg) throws Exception {
        if(msg instanceof AbstractMessage){
            byte messageType = msg.getMessageType();
            if(messageType!=AbstractMessage.CONNECT){
                //取消原来的任务,重新提交客户端心跳保持任务
                resubmitKeepAliveTask(ctx);
            }
            switch(messageType){
                case AbstractMessage.CONNECT:
                    processConnectRequest(ctx,msg);
                    break;
                case AbstractMessage.PUBLISH:
                    processPublishRequest(ctx,msg);
                    break;
                case AbstractMessage.PUBREL:
                    break;
                case AbstractMessage.PINGREQ:
                    processPingRequest(ctx,msg);
                    break;
                case AbstractMessage.SUBSCRIBE:
                    processSubscribeRequest(ctx,msg);
                    break;
                case AbstractMessage.UNSUBSCRIBE:
                    processUnsubscribeRequest(ctx,msg);
                    break;
                default:
                    logger.info("message type "+((AbstractMessage) msg).getMessageType());
                    break;
            }
        }
    }
    //为完成
    private void processUnsubscribeRequest(ChannelHandlerContext ctx, Object msg) {
    }

    /**
     * 客户端向服务端订阅
     * @param ctx
     * @param msg
     */
    private void processSubscribeRequest(ChannelHandlerContext ctx, Object msg) {
        SubscribeMessage subscribeMessage = (SubscribeMessage) msg;
        Integer messageID = subscribeMessage.getMessageID();
        //该客户端曾经订阅过话题
        if(subscribeClientMap.containsKey(ctx.channel())){
            SubAckMessage subAckMessage = new SubAckMessage();
            subAckMessage.setMessageID(messageID);
            subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
            if(subscribeMessage.getMessageID()!=null){
                ctx.channel().writeAndFlush(subAckMessage,ctx.newPromise().addListener(future -> {
                    List<SubscribeMessage> subscribeMessagesList = subscribeClientMap.get(ctx.channel());
                    subscribeMessagesList.add(subscribeMessage);
                    subscribeClientMap.put(ctx.channel(),subscribeMessagesList);
                    logger.info("server success send subAck to client {}, topic {} id {}",ctx.channel().remoteAddress(),subscribeMessage.subscriptions().get(0).getTopicFilter(),subscribeMessage.getMessageID());
                }));
            }
            //Qos=0,不用回复ack
            else{
                List<SubscribeMessage> subscribeMessagesList = subscribeClientMap.get(ctx.channel());
                subscribeMessagesList.add(subscribeMessage);
                subscribeClientMap.put(ctx.channel(),subscribeMessagesList);
            }
        }
        //该客户端未曾订阅过话题
        else{
            SubAckMessage subAckMessage = new SubAckMessage();
            subAckMessage.setMessageID(messageID);
            subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
            if(subscribeMessage.getMessageID()!=null){
                ctx.channel().writeAndFlush(subAckMessage, ctx.newPromise().addListener((future)->{
                    List<SubscribeMessage> subscribeMessageList = new ArrayList<>();
                    subscribeMessageList.add(subscribeMessage);
                    subscribeClientMap.put(ctx.channel(), subscribeMessageList);
                    logger.info("server success send subAck to client {}, id {}", ctx.channel().remoteAddress(), subAckMessage.getMessageID());
                }));
            }
            //Qos=0,不用回复ack
            else{
                List<SubscribeMessage> subscribeMessageList = new ArrayList<>();
                subscribeMessageList.add(subscribeMessage);
                subscribeClientMap.put(ctx.channel(),subscribeMessageList);
            }
        }
    }

    private void processPingRequest(ChannelHandlerContext ctx, Object msg) {
        logger.info("get heart beat from "+ctx.channel().remoteAddress());
        PingRespMessage pingRespMessage = new PingRespMessage();
        pingRespMessage.setQos(AbstractMessage.QOSType.MOST_ONE);//暂时只支持0
        ctx.channel().writeAndFlush(pingRespMessage);
    }

    private void processPublishRequest(ChannelHandlerContext ctx, Object msg) {
        PublishMessage publishMessage = (PublishMessage) msg;
        Integer messageID = publishMessage.getMessageID();
        //服务端已经保存了该客户端发布的话题消息
        //qos=1
        if(publishMessage.getQos()== AbstractMessage.QOSType.LEAST_ONE){
            if(needSendMap.containsKey(messageID)||alreadySendMap.contains(messageID)){
                logger.info("server already add this PublishMessage or push to subscribe client {} {}",publishMessage.getTopicName(),publishMessage.getMessageID());
            }else{
                boolean offer = needSendToSubClient.offer(publishMessage);
                if(offer){
                    needSendMap.put(messageID,publishMessage);
                    logger.info("server add PublishMessage into queue success {}",publishMessage.getTopicName());
                    PubAckMessage pubAckMessage = new PubAckMessage();
                    pubAckMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
                    pubAckMessage.setMessageID(publishMessage.getMessageID());
                    ctx.channel().writeAndFlush(pubAckMessage);
                }
            }
        }
        //qos=0
        else if (publishMessage.getQos()== AbstractMessage.QOSType.MOST_ONE){
            boolean offer = needSendToSubClient.offer(publishMessage);
            if(offer){
                logger.info("server add PublishMessage into queue success {}",publishMessage.getTopicName());
            }
        }
    }

    private void processConnectRequest(ChannelHandlerContext ctx, Object msg) {
        ConnectMessage msg1 = (ConnectMessage) msg;
        int keepAlive = msg1.getKeepAlive();//在没有接收到数据情况下客户端存活时间
        submitKeepAliveTask(ctx, Long.valueOf(keepAlive));
        if(msg1.getUsername().equals(this.userName)&&msg1.getPassword().equals(this.passWord)){
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte)0);
            ctx.channel().writeAndFlush(connAckMessage);
            logger.info("send connectAck");
        }else{
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte)4);
            ctx.channel().writeAndFlush(connAckMessage);
            logger.error("channel {} username or password error ",ctx.channel().remoteAddress());
            ctx.close();
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
       ctx.fireUserEventTriggered(evt);
    }

    public void resubmitKeepAliveTask(ChannelHandlerContext ctx){
        Future future = canalTask.get(ctx.channel());
        Long keepAlive = clientKeepAliveTime.get(ctx.channel());
        boolean cancel = future.cancel(true);
        if(cancel==true){
            logger.info("keep alive task resubmit success");
        }else{
            logger.error("channel already closed "+ctx.channel());
        }
        submitKeepAliveTask(ctx,keepAlive);
    }
    //提交心跳超时判断任务
    public void submitKeepAliveTask(ChannelHandlerContext ctx,Long keepAlive){
        Future clientKeepAliveTaskFuture = this.getClientKeepAliveTaskFuture(ctx, Long.valueOf(keepAlive * 1000));
        canalTask.put(ctx.channel(),clientKeepAliveTaskFuture);
        clientKeepAliveTime.put(ctx.channel(),keepAlive);
    }
    //取消任务用
    public Future getClientKeepAliveTaskFuture(ChannelHandlerContext ctx,Long keepAlive){
        ClientKeepAliveTask clientKeepAliveTask = new ClientKeepAliveTask(ctx.channel(), keepAlive, System.currentTimeMillis());
        clientKeepAliveQueue.offer(clientKeepAliveTask);
        return new Future() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                synchronized (clientKeepAliveTask){
                    if(clientKeepAliveTask.getDone()==false){
                        clientKeepAliveTask.setCanal(mayInterruptIfRunning);
                        return true;
                    }
                    return false;
                }
            }
            @Override
            public boolean isCancelled() {
                return clientKeepAliveTask.getCanal();
            }
            @Override
            public boolean isDone() {
                return clientKeepAliveTask.getDone();
            }
            @Override
            public Object get() throws InterruptedException, ExecutionException {
                return clientKeepAliveTask;
            }
            @Override
            public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }
        };
    };
    /**
     * 判断客户端的保持存活
     */
    private static class ClientKeepAliveTask implements Delayed{
        private Channel channel;
        /**
         * true表示在规定时间内收到数据,客户端保持存活,不断开连接
         */
        private Boolean canal=false;
        /**
         * 延迟毫秒数
         */
        private long delayMillis;
        /**
         * 开始时间
         */
        private long startTime;
        /**
         * 任务是否完成
         */
        private volatile Boolean Done=false;

        public ClientKeepAliveTask(Channel channel, Boolean canal, long delayMillis, long startTime) {
            this.channel = channel;
            this.canal = canal;
            this.delayMillis = delayMillis;
            this.startTime = startTime;
        }
        public ClientKeepAliveTask(Channel channel, long delayMillis, long startTime) {
            this.channel = channel;
            this.delayMillis = delayMillis;
            this.startTime = startTime;
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return (startTime+delayMillis)-System.currentTimeMillis();
        }
        @Override
        public int compareTo(Delayed o) {
            ClientKeepAliveTask o1 = (ClientKeepAliveTask) o;
            return (int) ((delayMillis+startTime)-(o1.delayMillis+o1.startTime));
        }
        public Channel getChannel() {
            return channel;
        }
        public void setChannel(Channel channel) {
            this.channel = channel;
        }
        public long getDelayMillis() {
            return delayMillis;
        }
        public void setDelayMillis(long delayMillis) {
            this.delayMillis = delayMillis;
        }
        public long getStartTime() {
            return startTime;
        }
        public void setStartTime(long startTime) {
            this.startTime = startTime;
        }
        public Boolean getCanal() {
            return canal;
        }
        public void setCanal(Boolean canal) {
            this.canal = canal;
        }
        public Boolean getDone() {
            return Done;
        }
        public void setDone(Boolean done) {
            Done = done;
        }
    }
}

下一篇:订阅客户端
https://www.jianshu.com/p/362d559d8ae9

相关文章

网友评论

      本文标题:2020-05-04 结合Netty和moquette模拟MQT

      本文链接:https://www.haomeiwen.com/subject/jypeghtx.html