美文网首页
SpringBoot 2 整合 Netty 实现基于 DTU 的

SpringBoot 2 整合 Netty 实现基于 DTU 的

作者: 程就人生 | 来源:发表于2020-08-06 22:16 被阅读0次

上一篇文章把客户端的代码贴出来了,还缺少一部分代码,缺少了这部分代码,客户端就运行不起来;废话少说,赶紧上缺失的代码吧,还有服务器的代码。服务端的代码,编解码与客户端是完全一致的,不然会收不到信息。服务端的代码还是一步一步的贴上来吧。

第一步,pom文件引入netty架包;

<!-- Spring Boot的核心启动器,包含了自动配置、日志和YAML -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- netty架包 -->
        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
        </dependency>

编解码,请参考上一篇文章;

第二步,netty服务端的server类;
SocketServer类:

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.example.im.codec.ByteArrayDecoder;
import com.example.im.codec.ByteArrayEncoder;
import com.example.im.handler.ExceptionHandler;
import com.example.im.handler.HeartBeatServerHandler;
import com.example.im.handler.LoginRequestHandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * netty dtu服务器
 * @author 程就人生
 * @date 2020年8月6日
 * @Description 
 *
 */
@Component
public class SocketServer {
            
    private static Logger log = LoggerFactory.getLogger(SocketServer.class);
    
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    //开启两个线程池
    private final EventLoopGroup workGroup = new NioEventLoopGroup();
    
    //启动装饰类
    private final ServerBootstrap serverBootstrap = new ServerBootstrap();  
    
    //本机的ip地址
    private String ip;
    
    //启动端口获取
    @Value("${server.port}")
    private int port;
    
    @Autowired
    private LoginRequestHandler loginRequestHandler;
    
    @Autowired
    private ExceptionHandler exceptionHandler;
        
    /**
     * 启动服务
     * @param port
     */
    public void start(){
        
        try {
            //获取本机的ip地址
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e1) {
            e1.printStackTrace();
        }
        
        serverBootstrap.group(bossGroup,workGroup)
              //非阻塞
              .channel(NioServerSocketChannel.class)
              //连接缓冲池的大小
              .option(ChannelOption.SO_BACKLOG, 1024)
               //设置通道Channel的分配器
              .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
              .childHandler(new ChannelInitializer<SocketChannel>(){
                  
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {              
                ChannelPipeline pipeline = socketChannel.pipeline();
                //添加编解码和处理器(节点间通讯用)
                pipeline.addLast("bdeCoder", new ByteArrayDecoder());
                pipeline.addLast("benCoder", new ByteArrayEncoder());
                //管理后台登录
                pipeline.addLast("loginHandler", loginRequestHandler);
                //心跳检测
                pipeline.addLast("heartBeat", new HeartBeatServerHandler());      
                //异常处理
                pipeline.addLast("exception", exceptionHandler);

            }
        });
        
        ChannelFuture channelFuture = null;
        //启动成功标识
        boolean startFlag = false;
        //启动失败时,多次启动,直到启动成功为止
        while(!startFlag){
            try{
                channelFuture = serverBootstrap.bind(port).sync();
                startFlag = true;
            }catch(Exception e){
                log.info("端口号:" + port + "已被占用!");
                port++;
                log.info("尝试一个新的端口:" + port);
                //重新便规定端口号
                serverBootstrap.localAddress(new InetSocketAddress(port));  
            }           
        }
        
        //服务端启动监听事件
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                //启动成功后的处理
                if (future.isSuccess()) {       
                   log.info("netty dtu服务器启动成功,Started Successed:" + ip + ":" + port);                     
                } else {
                   log.info("netty dtu服务器启动失败,Started Failed:" + ip + ":" + port);
                }
            }
        }); 
        
        try {
            // 7 监听通道关闭事件
            // 应用程序会一直等待,直到channel关闭
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("发生其他异常", e);
        } finally {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }    
}

netty服务端的启动类放在了入口文件里,随着入口文件一起启动;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

import com.example.im.SocketServer;

@SpringBootApplication
public class SpringbootNettyDtuServiceApplication {

    public static void main(String[] args) {
        //获取application的上下文
        ApplicationContext applicationContext = SpringApplication.run(SpringbootNettyDtuServiceApplication.class, args);
        
        /**
         * 启动netty dtu的服务端
         */
        SocketServer socketServer = applicationContext.getBean(SocketServer.class);
        socketServer.start();
    }

}

第三步,handler处理类,handler处理类和客户端的处理类是一一对应的,但是写法上有所不同;
异常处理类ExceptionHandler:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 服务端异常处理handler
 * @author 程就人生
 * @date 2019年11月12日
 * @Description 
 *
 */
@ChannelHandler.Sharable
@Service("exceptionHandler")
public class ExceptionHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(ExceptionHandler.class);

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof Exception) {
            //捕捉异常信息
            cause.printStackTrace();
            log.error(cause.getMessage());
            ctx.close();
        } else {
            //捕捉异常信息
            cause.printStackTrace();
            log.error(cause.getMessage());
            ctx.close();
        }
    }

    /**
     * 通道 Read 读取 Complete 完成
     * 做刷新操作 ctx.flush()
     */
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

心跳处理类HeartBeatServerHandler:

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 心跳处理
 * @author 程就人生
 * @date 2020年8月6日
 * @Description 
 *
 */
public class HeartBeatServerHandler extends IdleStateHandler {

    private static Logger log = LoggerFactory.getLogger(HeartBeatServerHandler.class);
    
    private static final int READ_IDLE_GAP = 150;

    public HeartBeatServerHandler() {
        super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {       
        // 判断消息实例
        if (null == msg || !(msg instanceof byte[])) {
            super.channelRead(ctx, msg);
            return;
        }
        byte[] data = (byte[]) msg;
        int dataLength = data.length;
        ByteBuf buf = Unpooled.buffer(dataLength);
        buf.writeBytes(data);
        int type = CharacterConvert.byteToInt(buf.readByte());  
        //机器编号
        int deviceId = CharacterConvert.byteToInt(buf.readByte());
        //如果是管理后台登录操作时
        if(type == ProtoInstant.HEART_BEAT){    
            int verify = CharacterConvert.byteToInt(buf.readByte());
            int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);
            if(verify != CharacterConvert.getLow8(sum)){
                log.error("心跳包,校验位错误!机器编码:" + deviceId);
            }else{
                log.info("接收到心跳信息" + deviceId); 
                if (ctx.channel().isActive()) {
                    ctx.writeAndFlush(msg);
                }
            }       
        }else{
            super.channelRead(ctx, msg);
        }       
    }
    
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //log.info(READ_IDLE_GAP + "秒内未读到数据!");
    }}

登录处理类LoginRequestHandler:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 管理后台登录处理
 * @author 程就人生
 * @date 2020年8月3日
 * @Description 
 *
 */
@Service("loginRequestHandler")
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
    
    private static Logger log = LoggerFactory.getLogger(LoginRequestHandler.class);

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if (null == msg || !(msg instanceof byte[])) {
            super.channelRead(ctx, msg);
            return;
        }

        byte[] data = (byte[]) msg;
        int dataLength = data.length;
        ByteBuf buf = Unpooled.buffer(dataLength);
        buf.writeBytes(data);
        int type = CharacterConvert.byteToInt(buf.readByte());
        //机器编号
        int deviceId = CharacterConvert.byteToInt(buf.readByte());
        //如果是管理后台登录操作时
        if(type == ProtoInstant.LOGIN){
            log.info("客户端登录了,机器编码:" + deviceId);
            int verify = CharacterConvert.byteToInt(buf.readByte());
            //加上字头,数据长度(数据长度包括字头和数据长度本身的位数)
            int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);
            if(verify != CharacterConvert.getLow8(sum)){
                log.error("客户端,校验位错误,机器编码:" + deviceId);
                //验证错误,关闭链接
                //ctx.close();
            }else{
                log.info("客户端登录成功处理");
                buf.clear();
                buf.writeByte(ProtoInstant.LOGIN);
                buf.writeByte(deviceId);
                buf.writeByte(verify);
                //回写消息
                ctx.channel().writeAndFlush(buf.array());
            }           
        }else{
            super.channelRead(ctx, msg);
        }
    }
}

第四步,工具类及常量;
字节转化的类CharacterConvert:

/**
 * 字节转换
 * @author 程就人生
 * @date 2020年8月5日
 * @Description 
 *
 */
public class CharacterConvert {
    
    /**
     * byte转换成int
     * @param number
     * @return
     */
    public static int byteToInt(byte number){
        return number & 0xff;
    }
    
    /**
     * short转成int
     * @param number
     * @return
     */
    public static int shortToInt(short number){
        return number & 0xff;
    }
    
    /**
     * int型数据,低八位获取
     * @param data
     * @return
     */
    public static int getLow8(int number){
        //转换成二进制的字符串形式
        //Integer.toBinaryString(number & 0xff)
        return number & 0xff;
    }
    
    /**
     * int型数据,取低四位
     * @param number
     * @return
     */
    public static int getLow4(int number){
        return number & 0x0f;
    }
    
    /**
     * bytes2HexString 
     * 字节数组转16进制字符串 
     * @param b 字节数组 
     * @return 16进制字符串 
     */ 
    public static String bytes2HexString(byte[] b) {
        StringBuffer hexString = new StringBuffer();  
        String hex;  
        for (int i = 0; i < b.length; i++) {  
            hex = Integer.toHexString(b[i] & 0xFF);  
            if (hex.length() == 1) {  
                hex = '0' + hex;  
            }  
            hexString.append(hex.toUpperCase());  
        }  
        return hexString.toString();
    }  
    
    /** 
     * string2HexString 
     * 字符串转16进制字符串 
     * @param strPart  字符串 
     * @return 16进制字符串 
     */  
    public static String string2HexString(String strPart) {  
        StringBuffer hexString = new StringBuffer();  
        for (int i = 0; i < strPart.length(); i++) {  
            int ch = (int) strPart.charAt(i);  
            String strHex = Integer.toHexString(ch);  
            hexString.append(strHex);  
        }  
        return hexString.toString();  
    }

    /** 
     * hexString2Bytes 
     * 16进制字符串转字节数组 
     * @param src  16进制字符串 
     * @return 字节数组 
     */ 
    public static byte[] hexString2Bytes(String src) {
        int len = src.length() / 2; 
        byte[] ret = new byte[len]; 
        for (int i = 0; i < len; i++) {
            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); 
        }  
        return ret;  
    }


    /** 
     * hexString2String 
     * 16进制字符串转字符串 
     * @param src 
     * 16进制字符串 
     * @return 字节数组 
     * @throws 
     */ 
    public static String hexString2String(String src) {  
        StringBuffer str = new StringBuffer(); 
        for (int i = 0; i < src.length() / 2; i++) {
            str.append((char)Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue());
        }  
        return str.toString();  
    }  
    
    /** 
     * char2Byte
     * 字符转成字节数据char-->integer-->byte
     * @param src 
     * @return 
     * @throws 
     */  
    public static Byte char2Byte(Character src) {
        return Integer.valueOf((int)src).byteValue(); 
    }  
     
    /** 
     * intToHexString 
     * 10进制数字转成16进制 
     * @param a 转化数据 
     * @param len 占用字节数 
     * @return 
     * @throws 
     */  
    public static String intToHexString(int a,int len){ 
        len<<=1;  
        String hexString = Integer.toHexString(a);  
        int b = len -hexString.length();  
        if(b>0){  
            for(int i=0;i<b;i++)  {  
                hexString = "0" + hexString;  
            }  
        }  
        return hexString;  
    }     

    /**
     * 将16进制的2个字符串进行异或运算
     * http://blog.csdn.net/acrambler/article/details/45743157  
     * @param strHex_X
     * @param strHex_Y
     * 注意:此方法是针对一个十六进制字符串一字节之间的异或运算,如对十五字节的十六进制字符串异或运算:1312f70f900168d900007df57b4884
        先进行拆分:13 12 f7 0f 90 01 68 d9 00 00 7d f5 7b 48 84
        13 xor 12-->1
        1 xor f7-->f6
        f6 xor 0f-->f9
        ....
        62 xor 84-->e6
        即,得到的一字节校验码为:e6
     * @return
     */
    public static String xor(String strHex_X,String strHex_Y){   

        //将x、y转成二进制形式
        String anotherBinary=Integer.toBinaryString(Integer.valueOf(strHex_X,16));  

        String thisBinary=Integer.toBinaryString(Integer.valueOf(strHex_Y,16));   

        String result = "";   

        //判断是否为8位二进制,否则左补零   
        if(anotherBinary.length() != 8){   
            for (int i = anotherBinary.length(); i <8; i++) {   
                anotherBinary = "0"+anotherBinary;   

            }   
        }   

        if(thisBinary.length() != 8){   
            for (int i = thisBinary.length(); i <8; i++) {
                thisBinary = "0"+thisBinary; 
            }  
        }   

        //异或运算
        for(int i=0;i<anotherBinary.length();i++){
            //如果相同位置数相同,则补0,否则补1
            if(thisBinary.charAt(i)==anotherBinary.charAt(i)){
                result+="0"; 
            }else{
                result+="1";
            }
        }
        return Integer.toHexString(Integer.parseInt(result, 2));
    }

    /**
     * byte转换成int型字符串
     * Convert byte[] to hex string
     * @param src byte[] data 
     * @return hex string   
     */
    public static String bytes2Str(byte[] src){ 
        StringBuilder stringBuilder = new StringBuilder();
        if (src == null || src.length <= 0) {
            return null; 
        }
        for (int i = 0; i < src.length; i++) { 
            int v = src[i] & 0xFF;   
            String hv = Integer.toHexString(v); 
            if (hv.length() < 2) {
                stringBuilder.append(0); 
            }   
            stringBuilder.append(hv);   
        }   
        return stringBuilder.toString(); 
    }

    /**
     * @param msg
     * @return 接收字节数据并转为16进制字符串
     */
    public static String receiveHexToString(byte[] by) {
        try {
            String str = bytes2Str(by);
            str = str.toLowerCase();
            return str;
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return null;
    }   

    /**
     * "7dd",4,'0'==>"07dd"
     * @param input 需要补位的字符串
     * @param size 补位后的最终长度
     * @param symbol 按symol补充 如'0'
     * @return
     * N_TimeCheck中用到了
     */
    public static String fill(String input, int size, char symbol) {
        while (input.length() < size) {
            input = symbol + input;
        }
        return input;
    }
    
    /**
     * integer数据求和
     * @param integers
     * @return
     */
    public static int sum(Integer... integers){
        Integer sum = 0;
        for(int i=0;i<integers.length;i++){
            sum = Integer.sum(sum, integers[i]);
        }       
        return sum;
    }
}

常量类ProtoInstant :

/**
 * 编解码常量配置
 * @author 程就人生
 * @date 2020年7月13日
 * @Description 
 *
 */
public class ProtoInstant {

    /**
     * 字头
     */
    public static final int FIELD_HEAD = 0xcc;
    
    /**
     * 字头长度(字头 + 数据长度)
     */
    public static final int FILED_LEN = 3;
    
    //心跳
    public static final int HEART_BEAT = 0;
    
    //登录
    public static final int LOGIN = 1;
    
}

最后,测试;
分别启动客户端、服务端,看看服务端和客户端是否能够正常通讯;

netty 服务端运行输出
netty模拟DTU端运行输出

总结
在此次编码过程中,主要遇到的技术难点:
第一个是对字节数组的处理,在使用netty怎么解析字节数组时,特别是跨语言的,低八位、高八位如何存放,踩了不少坑,走了不少弯路;
第二个还是对字节数组的处理,怎么获取一个数字的低八位,没想到以前学过的最不起眼的知识,总以为学了没用的知识起到了关键作用。

经验都是从踩坑填坑走过来的,编码不易,且写且思考,急躁不得。

相关文章

网友评论

      本文标题:SpringBoot 2 整合 Netty 实现基于 DTU 的

      本文链接:https://www.haomeiwen.com/subject/uvgtdktx.html