自己写的MQTT服务器【Qos暂时支持0、1】
上代码:
1.myMQTTServer
package com.MyMQTTServerAndClient.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/*
* Description TODO
* On 4/27/2020 3:44 PM
*/
public class myMQTTServer {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();//启动类
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.SO_BACKLOG,124);
serverBootstrap.childHandler(new ServerChannelInitializer());
ChannelFuture future = serverBootstrap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2.ServerChannelInitializer
package com.MyMQTTServerAndClient.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.eclipse.moquette.parser.netty.MQTTDecoder;
import org.eclipse.moquette.parser.netty.MQTTEncoder;
/*
* Description TODO
* On 4/28/2020 5:19 PM
*/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new MQTTDecoder());//解码
pipeline.addLast("encoder", new MQTTEncoder());//编码
//多个客户端下使用单例模式,保证是同一个handler
pipeline.addLast("myHandler",MQTTServerHandler.getSingleton());
}
}
3.MQTTServerHandler
package com.MyMQTTServerAndClient.server;
import io.netty.channel.*;
import io.netty.util.concurrent.GenericFutureListener;
import org.eclipse.moquette.proto.messages.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* MQTTServerHandler 使用单例模式
*/
//多个客户端和一个服务器连接必须是Sharable
@ChannelHandler.Sharable
public class MQTTServerHandler extends SimpleChannelInboundHandler<AbstractMessage> {
private static Logger logger = LoggerFactory.getLogger(MQTTServerHandler.class);
private String userName="admin";
private String passWord="password";
//存放客户端发布的话题
public static BlockingQueue<PublishMessage> needSendToSubClient;
//为了获取needSendToSubClient中的PublishMessage 【 key为messageId value为该PublishMessage】
ConcurrentHashMap<Integer,PublishMessage> needSendMap=new ConcurrentHashMap<>();//定时清理
//存放已经发送成功发送给订阅的客户端消息的map
ConcurrentHashMap<Integer,PublishMessage> alreadySendMap=new ConcurrentHashMap<>();//定时清理
//存放已经发送成功发送给订阅的客户端消息 queue
public static BlockingQueue<PublishMessage> alreadySendToSubClient;
//
ExecutorService executorService;
//定时任务执行线程池
ScheduledExecutorService scheduledExecutorService;
//客户端订阅的消息
Map<Channel,List<SubscribeMessage>> subscribeClientMap;
AtomicBoolean running=new AtomicBoolean(true);
Map<Channel,Long> clientKeepAliveTime=new HashMap<>();
//客户端存活判断队列
DelayQueue<Delayed> clientKeepAliveQueue=new DelayQueue<>();
//取消任务用[channel对应的最近的客户端是否超时没有数据任务]
Map<Channel,Future> canalTask=new HashMap<>();
private static MQTTServerHandler mQTTServerHandler;
//多个客户端下使用单例
public static MQTTServerHandler getSingleton(){
if(mQTTServerHandler==null){
synchronized (MQTTServerHandler.class){
mQTTServerHandler=new MQTTServerHandler();
}
}
return mQTTServerHandler;
}
//单例模式下只会初始化一次
private MQTTServerHandler() {
logger.info("===========start init MQTTServerHandler============");
this.executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
this.scheduledExecutorService=Executors.newScheduledThreadPool(4);
this.subscribeClientMap=new ConcurrentHashMap<>();
this.needSendToSubClient=new LinkedBlockingQueue<>();
this.alreadySendToSubClient=new LinkedBlockingQueue<>();
executorService.submit(()->{
while (running.get()) {
try {
ClientKeepAliveTask task = (ClientKeepAliveTask) clientKeepAliveQueue.poll(5, TimeUnit.SECONDS);
if (task != null) {
synchronized (task){
if(task.getCanal()==false){
//在给定keepAlive时间内没有收到数据或者心跳,关闭连接
task.getChannel().close();
task.setDone(true);
logger.error(System.currentTimeMillis()+" channel "+task.channel.remoteAddress()+" close");
}
}
}
} catch (Exception e) {
logger.error("error message {}",e);
}
}
});
scheduledExecutorService.scheduleAtFixedRate(()-> {
if (running.get()) {
logger.info("heart beat queue size {} ", clientKeepAliveQueue.size());
}
},0,30,TimeUnit.SECONDS);
//打印客户端发布的主题消息
scheduledExecutorService.scheduleAtFixedRate(()-> {
if (running.get()) {
if(!needSendToSubClient.isEmpty()){
logger.info("needSendToSubClient queue size {}",needSendToSubClient.size());
needSendToSubClient.forEach((p)->{
logger.info(p.getTopicName()+" payload [ " +new String(p.getPayload().array())+ " ]");
});
}
}
},2,4,TimeUnit.SECONDS);
//发送给感兴趣的客户端
executorService.submit(()->{
while (running.get()) {
while(!needSendToSubClient.isEmpty()){
PublishMessage publishMessage = null;
try {
publishMessage = needSendToSubClient.poll(100, TimeUnit.MILLISECONDS);
if(publishMessage!=null){
//发送给感兴趣的客户端
Set<Channel> channelSet = subscribeClientMap.keySet();
for(Channel channel:channelSet){
List<SubscribeMessage> subscribeMessageList = subscribeClientMap.get(channel);
for(SubscribeMessage sub:subscribeMessageList){
if(sub.subscriptions().get(0).getTopicFilter().equals(publishMessage.getTopicName())){
PublishMessage finalPublishMessage = publishMessage;
channel.writeAndFlush(publishMessage,channel.newPromise().addListener(future -> {
logger.info("PublishMessage send to client {} success topic {}",channel.remoteAddress(),finalPublishMessage.getTopicName());
alreadySendMap.put(finalPublishMessage.getMessageID(),finalPublishMessage);
}));
}
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
//定时清理缓存
scheduledExecutorService.scheduleAtFixedRate(()-> {
if (running.get()) {
if(!alreadySendMap.isEmpty()){
alreadySendMap.clear();
logger.info("alreadySendMap map clear");
}
if(!needSendMap.isEmpty()){
needSendMap.clear();
logger.info("needSendMap map clear");
}
}
},60,60,TimeUnit.SECONDS);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("error message "+cause.getMessage());
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("client "+ctx.channel().remoteAddress()+" Registered");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.error("channelInactive " +ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, AbstractMessage msg) throws Exception {
if(msg instanceof AbstractMessage){
byte messageType = msg.getMessageType();
if(messageType!=AbstractMessage.CONNECT){
//取消原来的任务,重新提交客户端心跳保持任务
resubmitKeepAliveTask(ctx);
}
switch(messageType){
case AbstractMessage.CONNECT:
processConnectRequest(ctx,msg);
break;
case AbstractMessage.PUBLISH:
processPublishRequest(ctx,msg);
break;
case AbstractMessage.PUBREL:
break;
case AbstractMessage.PINGREQ:
processPingRequest(ctx,msg);
break;
case AbstractMessage.SUBSCRIBE:
processSubscribeRequest(ctx,msg);
break;
case AbstractMessage.UNSUBSCRIBE:
processUnsubscribeRequest(ctx,msg);
break;
default:
logger.info("message type "+((AbstractMessage) msg).getMessageType());
break;
}
}
}
//为完成
private void processUnsubscribeRequest(ChannelHandlerContext ctx, Object msg) {
}
/**
* 客户端向服务端订阅
* @param ctx
* @param msg
*/
private void processSubscribeRequest(ChannelHandlerContext ctx, Object msg) {
SubscribeMessage subscribeMessage = (SubscribeMessage) msg;
Integer messageID = subscribeMessage.getMessageID();
//该客户端曾经订阅过话题
if(subscribeClientMap.containsKey(ctx.channel())){
SubAckMessage subAckMessage = new SubAckMessage();
subAckMessage.setMessageID(messageID);
subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
if(subscribeMessage.getMessageID()!=null){
ctx.channel().writeAndFlush(subAckMessage,ctx.newPromise().addListener(future -> {
List<SubscribeMessage> subscribeMessagesList = subscribeClientMap.get(ctx.channel());
subscribeMessagesList.add(subscribeMessage);
subscribeClientMap.put(ctx.channel(),subscribeMessagesList);
logger.info("server success send subAck to client {}, topic {} id {}",ctx.channel().remoteAddress(),subscribeMessage.subscriptions().get(0).getTopicFilter(),subscribeMessage.getMessageID());
}));
}
//Qos=0,不用回复ack
else{
List<SubscribeMessage> subscribeMessagesList = subscribeClientMap.get(ctx.channel());
subscribeMessagesList.add(subscribeMessage);
subscribeClientMap.put(ctx.channel(),subscribeMessagesList);
}
}
//该客户端未曾订阅过话题
else{
SubAckMessage subAckMessage = new SubAckMessage();
subAckMessage.setMessageID(messageID);
subAckMessage.addType(AbstractMessage.QOSType.MOST_ONE);
if(subscribeMessage.getMessageID()!=null){
ctx.channel().writeAndFlush(subAckMessage, ctx.newPromise().addListener((future)->{
List<SubscribeMessage> subscribeMessageList = new ArrayList<>();
subscribeMessageList.add(subscribeMessage);
subscribeClientMap.put(ctx.channel(), subscribeMessageList);
logger.info("server success send subAck to client {}, id {}", ctx.channel().remoteAddress(), subAckMessage.getMessageID());
}));
}
//Qos=0,不用回复ack
else{
List<SubscribeMessage> subscribeMessageList = new ArrayList<>();
subscribeMessageList.add(subscribeMessage);
subscribeClientMap.put(ctx.channel(),subscribeMessageList);
}
}
}
private void processPingRequest(ChannelHandlerContext ctx, Object msg) {
logger.info("get heart beat from "+ctx.channel().remoteAddress());
PingRespMessage pingRespMessage = new PingRespMessage();
pingRespMessage.setQos(AbstractMessage.QOSType.MOST_ONE);//暂时只支持0
ctx.channel().writeAndFlush(pingRespMessage);
}
private void processPublishRequest(ChannelHandlerContext ctx, Object msg) {
PublishMessage publishMessage = (PublishMessage) msg;
Integer messageID = publishMessage.getMessageID();
//服务端已经保存了该客户端发布的话题消息
//qos=1
if(publishMessage.getQos()== AbstractMessage.QOSType.LEAST_ONE){
if(needSendMap.containsKey(messageID)||alreadySendMap.contains(messageID)){
logger.info("server already add this PublishMessage or push to subscribe client {} {}",publishMessage.getTopicName(),publishMessage.getMessageID());
}else{
boolean offer = needSendToSubClient.offer(publishMessage);
if(offer){
needSendMap.put(messageID,publishMessage);
logger.info("server add PublishMessage into queue success {}",publishMessage.getTopicName());
PubAckMessage pubAckMessage = new PubAckMessage();
pubAckMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
pubAckMessage.setMessageID(publishMessage.getMessageID());
ctx.channel().writeAndFlush(pubAckMessage);
}
}
}
//qos=0
else if (publishMessage.getQos()== AbstractMessage.QOSType.MOST_ONE){
boolean offer = needSendToSubClient.offer(publishMessage);
if(offer){
logger.info("server add PublishMessage into queue success {}",publishMessage.getTopicName());
}
}
}
private void processConnectRequest(ChannelHandlerContext ctx, Object msg) {
ConnectMessage msg1 = (ConnectMessage) msg;
int keepAlive = msg1.getKeepAlive();//在没有接收到数据情况下客户端存活时间
submitKeepAliveTask(ctx, Long.valueOf(keepAlive));
if(msg1.getUsername().equals(this.userName)&&msg1.getPassword().equals(this.passWord)){
ConnAckMessage connAckMessage = new ConnAckMessage();
connAckMessage.setReturnCode((byte)0);
ctx.channel().writeAndFlush(connAckMessage);
logger.info("send connectAck");
}else{
ConnAckMessage connAckMessage = new ConnAckMessage();
connAckMessage.setReturnCode((byte)4);
ctx.channel().writeAndFlush(connAckMessage);
logger.error("channel {} username or password error ",ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void resubmitKeepAliveTask(ChannelHandlerContext ctx){
Future future = canalTask.get(ctx.channel());
Long keepAlive = clientKeepAliveTime.get(ctx.channel());
boolean cancel = future.cancel(true);
if(cancel==true){
logger.info("keep alive task resubmit success");
}else{
logger.error("channel already closed "+ctx.channel());
}
submitKeepAliveTask(ctx,keepAlive);
}
//提交心跳超时判断任务
public void submitKeepAliveTask(ChannelHandlerContext ctx,Long keepAlive){
Future clientKeepAliveTaskFuture = this.getClientKeepAliveTaskFuture(ctx, Long.valueOf(keepAlive * 1000));
canalTask.put(ctx.channel(),clientKeepAliveTaskFuture);
clientKeepAliveTime.put(ctx.channel(),keepAlive);
}
//取消任务用
public Future getClientKeepAliveTaskFuture(ChannelHandlerContext ctx,Long keepAlive){
ClientKeepAliveTask clientKeepAliveTask = new ClientKeepAliveTask(ctx.channel(), keepAlive, System.currentTimeMillis());
clientKeepAliveQueue.offer(clientKeepAliveTask);
return new Future() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
synchronized (clientKeepAliveTask){
if(clientKeepAliveTask.getDone()==false){
clientKeepAliveTask.setCanal(mayInterruptIfRunning);
return true;
}
return false;
}
}
@Override
public boolean isCancelled() {
return clientKeepAliveTask.getCanal();
}
@Override
public boolean isDone() {
return clientKeepAliveTask.getDone();
}
@Override
public Object get() throws InterruptedException, ExecutionException {
return clientKeepAliveTask;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
};
};
/**
* 判断客户端的保持存活
*/
private static class ClientKeepAliveTask implements Delayed{
private Channel channel;
/**
* true表示在规定时间内收到数据,客户端保持存活,不断开连接
*/
private Boolean canal=false;
/**
* 延迟毫秒数
*/
private long delayMillis;
/**
* 开始时间
*/
private long startTime;
/**
* 任务是否完成
*/
private volatile Boolean Done=false;
public ClientKeepAliveTask(Channel channel, Boolean canal, long delayMillis, long startTime) {
this.channel = channel;
this.canal = canal;
this.delayMillis = delayMillis;
this.startTime = startTime;
}
public ClientKeepAliveTask(Channel channel, long delayMillis, long startTime) {
this.channel = channel;
this.delayMillis = delayMillis;
this.startTime = startTime;
}
@Override
public long getDelay(TimeUnit unit) {
return (startTime+delayMillis)-System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
ClientKeepAliveTask o1 = (ClientKeepAliveTask) o;
return (int) ((delayMillis+startTime)-(o1.delayMillis+o1.startTime));
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public long getDelayMillis() {
return delayMillis;
}
public void setDelayMillis(long delayMillis) {
this.delayMillis = delayMillis;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public Boolean getCanal() {
return canal;
}
public void setCanal(Boolean canal) {
this.canal = canal;
}
public Boolean getDone() {
return Done;
}
public void setDone(Boolean done) {
Done = done;
}
}
}
下一篇:订阅客户端
https://www.jianshu.com/p/362d559d8ae9
网友评论