美文网首页
使用Netty实现汉王考勤机云平台

使用Netty实现汉王考勤机云平台

作者: 删我丶 | 来源:发表于2019-08-25 15:37 被阅读0次

    开发场景

    由于公司需要使用汉王考勤机,并需要将考勤内容信息存储至数据库,并提供接口给其他系统平台使用。
    汉王官方提供的JavaApi为多线程+io流处理,考勤机数量多时会出现数据混乱现象,所以有此想法改为Netty实现。

    开发所用技术

    SpringBoot2.1.4+netty+mysql5.7+Quartz+Freemarker(代码生成器使用)

    开发实现

    由于汉王考勤机相当于客户端,所以后台只需要实现服务端即可。一切代码遵循汉王官网文档的协议内容及传输接收方式。

    UDP协议

    • UPPServer类
    package com.hanwang.config;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioDatagramChannel;
    import io.netty.util.concurrent.Future;
    
    import javax.annotation.PreDestroy;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
     
    @Component
    public class NettyUDPServer {
     
         
            private static final EventLoopGroup group = new NioEventLoopGroup(1);
         
            @Autowired
            UDPServerChannelInitializer serverChannelInitializer;
         
            @Value("${port}")
            private int port;
         
            //监听端口的通道,即server的处理通道
            private Channel channel;
         
            /**
             * 开启udp server服务
             *
             * @return
             * @throws InterruptedException 
             */
            public ChannelFuture start() throws InterruptedException{
                //启动类
                Bootstrap serverBootstrap = new Bootstrap();
                serverBootstrap.group(group)//组配置,初始化ServerBootstrap的线程组
                        .channel(NioDatagramChannel.class)//数据包通道,udp通道类型
                        //支持广播
                        .handler(serverChannelInitializer);//通道处理者
                //Future:异步任务的生命周期,可用来获取任务结果
                ChannelFuture channelFuture1 = serverBootstrap.bind(port).sync();//绑定端口,开启监听,同步等待
                if (channelFuture1 != null && channelFuture1.isSuccess()) {
                    System.out.println("[UDP] server start success, port = {}");
                    channel = channelFuture1.channel();//获取通道
                } else {
                    channelFuture1.cause().printStackTrace();
                }
                return channelFuture1;
            }
         
            /**
             * 停止udp server服务
             * 销毁前的拦截
             */
            @PreDestroy
            public void destroy() {
                try {
                    if (channel != null) {
                        ChannelFuture await = channel.close().await();
                        if (!await.isSuccess()) {
                        }
                    }
                    Future<?> future1 = group.shutdownGracefully().await();
                    if (!future1.isSuccess()) {
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    }
    
    • UPPServer启动类
    package com.hanwang.config;
    
    import io.netty.channel.ChannelFuture;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    import com.hanwang.service.KqptCommandService;
    import com.hanwang.service.KqptDevicesService;
    
    @Component
    public class NettyUDPServerRun implements ApplicationRunner{
        
        @Autowired
        NettyUDPServer nettyUdpServer;
        
        @Value("${port}")
        private int PORT;
        
        @Autowired
        private KqptDevicesService kqptDevicesService;
        @Autowired
        private KqptCommandService commandService;
        
        
        @Override
        public void run(ApplicationArguments args){
            try {
                //启动服务端
                ChannelFuture start = nettyUdpServer.start();
                start.channel();
                //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
                //start.channel().closeFuture().syncUninterruptibly();
                
            } catch (Exception e) {
                
            }
            
        }
    
        
    }
    
    • UPDServer通道处理类
    package com.hanwang.config;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.util.CharsetUtil;
    
    import java.nio.charset.Charset;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.hanwang.entity.KqptCommand;
    import com.hanwang.entity.KqptDevices;
    import com.hanwang.service.KqptCommandService;
    import com.hanwang.service.KqptDevicesService;
    import com.hanwang.splash.FaceId_Item;
    import com.hanwang.utils.DateUtil;
     
    /**
     * description: 通道数据输入的处理
     **/
    @Component
    @ChannelHandler.Sharable
    public class UDPServerChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> {
        
        @Autowired
        private KqptDevicesService kqptDevicesService;
        @Autowired
        private KqptCommandService commandService;
     
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
            System.out.println("[UDP] server 收到的消息:" + datagramPacket.content().toString(CharsetUtil.UTF_8));
            String message = datagramPacket.content().toString(CharsetUtil.UTF_8);
                //获取sbsn
                String sbsn = FaceId_Item.GetKeyValue(message, "sn");
                //从数据库kqpt_devices中根据sbsn获取设备监控信息
                KqptDevices kqptDevices = kqptDevicesService.selectBySbsn(sbsn);
                if(kqptDevices!=null){
                    if(kqptDevices.getStatus()==null || kqptDevices.getStatus()=="" || kqptDevices.getStatus()!="00" || (!kqptDevices.getStatus().equals("00"))){
                        kqptDevices.setStatus("00");
                    }
                    kqptDevices.setLastTime(DateUtil.getNow());
                    kqptDevicesService.update(kqptDevices);
                    //是否下发指令
                    Map<String, Object> map = new HashMap<String, Object>();
                    map.put("sbSn", sbsn);
                    map.put("nowTime", DateUtil.getNow());
                    List<KqptCommand> list = null;
                    list =  commandService.selectWzxByComidOrSbsn(map);
                    if(list.size()>0){
                            //向客户端下发指令-告知有未执行的命令
                            DatagramPacket datagramPacket1 = new DatagramPacket(Unpooled.copiedBuffer("PostRequest()", Charset.forName("GBK")), datagramPacket.sender());
                            channelHandlerContext.channel().writeAndFlush(datagramPacket1);
                    }
                    
                }
        }
     
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
     
    }
    
    
    • UPDServer通道初始化,用于处理字符串编码
    package com.hanwang.config;
    
    
     
    import java.nio.charset.Charset;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.nio.NioDatagramChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
     
    /**
     * description: 通道初始化,主要用于设置各种Handler
     **/
    @Component
    public class UDPServerChannelInitializer extends ChannelInitializer<NioDatagramChannel> {
     
        @Autowired
        UDPServerChannelInboundHandler serverChannelHandler;
     
        @Override
        protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
            ChannelPipeline pipeline = nioDatagramChannel.pipeline();
            pipeline.addLast(new StringDecoder());
            //自定义的InboundHandler输入处理者
            //pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
            //字符串编解码器
            pipeline.addLast(
                    new StringDecoder(CharsetUtil.UTF_8),
                    new StringEncoder(Charset.forName("GBK"))
                    );
            pipeline.addLast("serverChannelHandler", serverChannelHandler);
        }
    }
    
    

    TCP协议

    • TCPServer类
    package com.hanwang.config;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.concurrent.Future;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import javax.annotation.PreDestroy;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
     
    @Component
    public class NettyTCPServer {
     
        //boss事件轮询线程组
        //处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
        private EventLoopGroup boss = new NioEventLoopGroup(1);
        //worker事件轮询线程组
        //处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2
        private EventLoopGroup worker = new NioEventLoopGroup();
     
        @Autowired
        TCPServerChannelInitializer serverChannelInitializer;
     
        @Value("${netty.tcp.server.port}")
        private Integer port;
     
        //与客户端建立连接后得到的通道对象
        private Channel channel;
     
        /**
         * 存储client的channel
         * key:ip,value:Channel
         */
        public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();
     
        /**
         * 开启Netty tcp server服务
         *
         * @return
         */
        public ChannelFuture start() {
            //启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)//组配置,初始化ServerBootstrap的线程组
                    .channel(NioServerSocketChannel.class)///构造channel通道工厂//bossGroup的通道,只是负责连接
                    .childHandler(serverChannelInitializer)//设置通道处理者ChannelHandler////workerGroup的处理器
                    .option(ChannelOption.SO_BACKLOG, 1024)//socket参数,当服务器请求处理程全满时,用于临时存放已完成三次握手请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                    .childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制,tcp,默认2小时发一次心跳
            //Future:异步任务的生命周期,可用来获取任务结果
            ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待
            if (channelFuture1 != null && channelFuture1.isSuccess()) {
                channel = channelFuture1.channel();//获取通道
            } else {
            }
            return channelFuture1;
        }
     
        /**
         * 停止Netty tcp server服务
         */
        @PreDestroy
        public void destroy() {
            if (channel != null) {
                channel.close();
            }
            try {
                Future<?> future = worker.shutdownGracefully().await();
                if (!future.isSuccess()) {
                }
                Future<?> future1 = boss.shutdownGracefully().await();
                if (!future1.isSuccess()) {
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    
    • TCPServer启动类
    package com.hanwang.config;
    
    import io.netty.channel.ChannelFuture;
    
    import java.io.IOException;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class NettyTCPServerRun implements ApplicationRunner {
    
        @Autowired
        NettyTCPServer nettyTcpServer;
    
        @Override
        public void run(ApplicationArguments args) throws IOException {
            try {
                 //启动服务端
                ChannelFuture start = nettyTcpServer.start();
                
                start.channel();
                //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
                //start.channel().closeFuture().syncUninterruptibly();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    
    • TCPServer通道类
    package com.hanwang.config;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    import java.nio.charset.Charset;
    import java.util.HashMap;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.hanwang.entity.KqptCommand;
    import com.hanwang.entity.KqptDevices;
    import com.hanwang.entity.KqptEmployee;
    import com.hanwang.entity.KqptKqrecords;
    import com.hanwang.service.KqptCommandService;
    import com.hanwang.service.KqptDevicesService;
    import com.hanwang.service.KqptEmployeeService;
    import com.hanwang.service.KqptKqrecordsService;
    import com.hanwang.splash.FaceId_Item;
    import com.hanwang.utils.DateUtil;
    import com.hanwang.utils.StringUtils;
     
    
    @Component
    @ChannelHandler.Sharable
    public class TCPServerChannelHandler extends SimpleChannelInboundHandler<String> {
        
        // 设备号
        private static String serialNumber = null;
        // 设备号
        private static String GetEmployeeIDserialNumber = null;
        // 当前执行命令ID
        private static Integer commandId = 0;
        // 工号
        private static String empNo = null;
        // 姓名
        private static String name = null;
        
        
        @Autowired
        private KqptKqrecordsService kqptKqrecordsService; 
        @Autowired
        private KqptCommandService kqptCommandService;
        @Autowired
        private KqptEmployeeService kqptEmployeeService;
        @Autowired
        private KqptDevicesService kqptDevicesService;
     
        /**
         * 拿到传过来的msg数据,开始处理
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msgStr) throws Exception {
            
            System.out.println("Netty tcp server receive msg : " + msgStr);
            
            
            try{
            
            
            if(msgStr.startsWith("PostRecord"))
            {   
                // 答复已准备好接收考勤记录
                if (true)
                {
                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\" postphoto=\"false\")"
                            ).toString().toString(), Charset.forName("GBK"));
                    ctx.channel().writeAndFlush(buf);
                }
            }
            else if(msgStr.startsWith("Record"))
            {           
                // 服务器回应
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
                        ).toString().toString(), Charset.forName("GBK"));
                ctx.channel().writeAndFlush(buf);
            }
            else if(msgStr.startsWith("PostEmployee"))
            {   // 准备上传人员信息
    
                // 服务器回应
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
                        ).toString().toString(), Charset.forName("GBK"));
                ctx.channel().writeAndFlush(buf);
            }                
            else if(msgStr.startsWith("Employee"))
            {   // 读取人员信息
    
                // 服务器回应
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
                        ).toString().toString(), Charset.forName("GBK"));
                ctx.channel().writeAndFlush(buf);
            }
            //获取设备信息-GetDeviceInfo()
            else if(msgStr.startsWith("Return(result=\"success\"  time=")){
                KqptDevices kqptDevices = new KqptDevices();
                kqptDevices.setSbSn(FaceId_Item.GetKeyValue(msgStr, "dev_id"));
                kqptDevices.setMsg(msgStr);
                kqptDevices.setZcNum(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "real_faceregist")));
                kqptDevices.setSbNum(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "max_faceregist")));
                kqptDevicesService.update(kqptDevices);
            }
            // 下发命令
            else if (msgStr.startsWith("GetRequest"))
            {   
                //获取sbsn
                serialNumber = FaceId_Item.GetKeyValue(msgStr, "sn");
                
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("sbSn", serialNumber);
                map.put("nowTime", DateUtil.getNow());
                List<KqptCommand> list =  kqptCommandService.selectWzxByComidOrSbsn(map);
                if(list.size()>0){
                    try {
                        commandId = list.get(0).getId();
                        if(commandId!=null && StringUtils.isNotEmpty(list.get(0).getComNr())){
                                if(list.get(0).getComNr().startsWith("AddNameTable")){
                                    
                                    empNo = StringUtils.subString(list.get(0).getComNr(), "(", "=");
                                    name = StringUtils.subString(list.get(0).getComNr(), "=\"", "\")");
                                    
                                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                            ).toString().toString(), Charset.forName("GBK"));
                                    ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                    
                                    //添加ChannelFutureListener以便在写操作完成后接收通知
                                    cf.addListener(new ChannelFutureListener() {
                                        @Override
                                        public void operationComplete(ChannelFuture future) throws Exception {
                                            //写操作完成,并没有错误发生
                                            if(future.isSuccess()){
                                                //删除对应用户信息
                                                kqptEmployeeService.delBySbsn(serialNumber,empNo);
                                                //处理从考勤机获取到的AddNameTable
                                                KqptEmployee emp =  new KqptEmployee();
                                                emp.setEmpNo(empNo);
                                                emp.setSbSn(serialNumber);
                                                emp.setName(name);
                                                emp.setInTime(DateUtil.getNow());
                                                kqptEmployeeService.save(emp);
                                                //命令执行成功
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("2");
                                                    kqptCommand.setResult("success");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("GetDeviceInfo()"
                                                        ).toString().toString(), Charset.forName("GBK"));
                                                ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                            }else{
                                                //命令执行失败
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("3");
                                                    kqptCommand.setResult("fail");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                            }
                                        }
                                    });
                                }else if(list.get(0).getComNr().startsWith("DeleteEmployee")){
                                
                                    empNo = FaceId_Item.GetKeyValue(list.get(0).getComNr(), "id");
                                    
    /*                              //获取链接实例
                                    Channel channel = ctx.channel();
                                    //创建一个持有数据的ByteBuf
                                    //获取设备信息  GetDeviceInfo() 并存入数据库
                                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                            ).toString().toString(), Charset.forName("GBK"));*/
                                    //数据冲刷
                                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                            ).toString().toString(), Charset.forName("GBK"));
                                    ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                    
                                    //添加ChannelFutureListener以便在写操作完成后接收通知
                                    cf.addListener(new ChannelFutureListener() {
                                        @Override
                                        public void operationComplete(ChannelFuture future) throws Exception {
                                            //写操作完成,并没有错误发生
                                            if(future.isSuccess()){
                                                
                                                kqptEmployeeService.delBySbsn(serialNumber,empNo);
                                                //命令执行成功
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("2");
                                                    kqptCommand.setResult("success");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("GetDeviceInfo()"
                                                        ).toString().toString(), Charset.forName("GBK"));
                                                ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                            }else{
                                                //命令执行失败
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("3");
                                                    kqptCommand.setResult("fail");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                            }
                                        }
                                    });
                                }else if(list.get(0).getComNr().startsWith("GetEmployeeID")){
                                    GetEmployeeIDserialNumber = serialNumber;
                                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                            ).toString().toString(), Charset.forName("GBK"));
                                    ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                  //添加ChannelFutureListener以便在写操作完成后接收通知
                                    cf.addListener(new ChannelFutureListener() {
                                        @Override
                                        public void operationComplete(ChannelFuture future) throws Exception {
                                            //写操作完成,并没有错误发生
                                            if(future.isSuccess()){
                                                //命令执行成功
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("2");
                                                    kqptCommand.setResult("success");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                            }else{
                                                //命令执行失败
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("3");
                                                    kqptCommand.setResult("fail");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                            }
                                        }
                                    });
                                }else{
                                
                                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                            ).toString().toString(), Charset.forName("GBK"));
                                    ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                    //添加ChannelFutureListener以便在写操作完成后接收通知
                                    cf.addListener(new ChannelFutureListener() {
                                        @Override
                                        public void operationComplete(ChannelFuture future) throws Exception {
                                            //写操作完成,并没有错误发生
                                            if(future.isSuccess()){
                                                //命令执行成功
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("2");
                                                    kqptCommand.setResult("success");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                            }else{
                                                //命令执行失败
                                                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                    KqptCommand kqptCommand = new KqptCommand();
                                                    kqptCommand.setId(commandId);
                                                    kqptCommand.setStatus("3");
                                                    kqptCommand.setResult("fail");
                                                    kqptCommand.setLastTime(DateUtil.getNow());
                                                    kqptCommandService.update(kqptCommand);
                                                    commandId = null;
                                                }
                                            }
                                        }
                                    });
                                }
                        }else{
                            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                                    ).toString().toString(), Charset.forName("GBK"));
                            //数据冲刷
                            ctx.channel().writeAndFlush(buf);
                        }
                    }catch (Exception e) {
                    }
                }else{
                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                            ).toString().toString(), Charset.forName("GBK"));
                    //数据冲刷
                    ctx.channel().writeAndFlush(buf);
                }
            }
            //保存考勤记录到数据库
            else if(msgStr.startsWith("Return(result=\"success\" dev_id=\"")){
                //暂存时间用于数据上传时间
                String nowTime = DateUtil.getNow();
                if(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "total"))>0){
                    // 提取单条考勤记录
                    List<KqptKqrecords> kqptKqrecordsList = new LinkedList<KqptKqrecords>();
                    Pattern p = Pattern.compile("\\b(time=.+\\R(?:photo=\"[^\"]+\")*)");                
                    Matcher m = p.matcher(msgStr);
                    String devId = FaceId_Item.GetKeyValue(msgStr, "dev_id");   
                    while(m.find())
                    {                  
                        KqptKqrecords kqptKqrecords = new KqptKqrecords();
                        kqptKqrecords.setSbSn(devId);
                        kqptKqrecords.setEmpNo(FaceId_Item.GetKeyValue(m.group(1), "id"));
                        kqptKqrecords.setKqTime(FaceId_Item.GetKeyValue(m.group(1), "time"));
                        kqptKqrecords.setInTime(nowTime);
                        kqptKqrecords.setTbFlag("0");
                        kqptKqrecordsList.add(kqptKqrecords);
                    }  
                    kqptKqrecordsService.batchSave(kqptKqrecordsList);
                    System.out.println("保存考勤信息成功");
                }
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                        ).toString().toString(), Charset.forName("GBK"));
                //数据冲刷
                ctx.channel().writeAndFlush(buf);
            }
            //获取设备发起GetEmployeeID()指令
            else if(msgStr.startsWith("Return(result=\"success\" total=\"")){
                if(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "total"))>0){
                    FaceId_Item[] ItemCollection = FaceId_Item.GetAllItems(msgStr);
                    if (ItemCollection != null) 
                    {  
                        for (FaceId_Item item : ItemCollection) 
                        { 
                            if (item.name.equals("id"))
                            {   
                                KqptCommand command = new KqptCommand();
                                command.setSbSn(GetEmployeeIDserialNumber);
                                command.setComNr(new StringBuilder().append("GetEmployee(id=\"").append(item.value).append("\")").toString());
                                command.setStatus("0");
                                command.setCreateTime(DateUtil.getNow());
                                kqptCommandService.save(command);
                            }  
                        }
                    }
                }else{
                
                    ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                            ).toString().toString(), Charset.forName("GBK"));
                    //数据冲刷
                    ctx.channel().writeAndFlush(buf);
                }
            }
            else if(msgStr.startsWith("Return(result=\"success\" id=\"")){
                kqptEmployeeService.delBySbsn(serialNumber,FaceId_Item.GetKeyValue(msgStr, "id"));
                //处理从考勤机获取到的人员信息
                KqptEmployee employee =  new KqptEmployee();
                employee.setEmpNo(FaceId_Item.GetKeyValue(msgStr, "id"));
                employee.setSbSn(FaceId_Item.GetKeyValue(msgStr, "sn"));
                employee.setName(FaceId_Item.GetKeyValue(msgStr, "name"));
                employee.setAlgEdition(FaceId_Item.GetKeyValue(msgStr, "alg_edition"));
                employee.setCheckType(FaceId_Item.GetKeyValue(msgStr, "check_type"));
                employee.setFaceData(StringUtils.subString(msgStr, "face_data"));
                employee.setInTime(DateUtil.getNow());
                kqptEmployeeService.save(employee);
                System.out.println("人员信息保存成功"+FaceId_Item.GetKeyValue(msgStr, "name"));
                
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                        ).toString().toString(), Charset.forName("GBK"));
                //数据冲刷
                ctx.channel().writeAndFlush(buf);
            }
            else if(msgStr.startsWith("Return(result=\"failed\"")){   
                //命令执行失败
                if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                    KqptCommand kqptCommand = new KqptCommand();
                    kqptCommand.setId(commandId);
                    kqptCommand.setStatus("3");
                    kqptCommand.setResult("fail");
                    kqptCommand.setLastTime(DateUtil.getNow());
                    kqptCommandService.update(kqptCommand);
                    commandId = null;
                }
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                        ).toString().toString(), Charset.forName("GBK"));
                //数据冲刷
                ctx.channel().writeAndFlush(buf);
            }
            else if(msgStr.startsWith("Quit")){   // 结束会话
                
            }
        }
        catch (Exception ex)
        {
            //命令执行失败
            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                KqptCommand kqptCommand = new KqptCommand();
                kqptCommand.setId(commandId);
                kqptCommand.setStatus("3");
                kqptCommand.setResult("fail");
                kqptCommand.setLastTime(DateUtil.getNow());
                commandId = null;
                kqptCommandService.update(kqptCommand);
            }
        }
        }
     
        /**
         * 活跃的、有效的通道
         * 第一次连接成功后进入的方法
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            System.out.println("tcp client " + getRemoteAddress(ctx) + " connect success");
            //往channel map中添加channel信息
            NettyTCPServer.map.put(getIPString(ctx), ctx.channel());
        }
     
        /**
         * 不活动的通道
         * 连接丢失后执行的方法(client端可据此实现断线重连)
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            //删除Channel Map中的失效Client
            NettyTCPServer.map.remove(getIPString(ctx));
            ctx.close();
        }
     
        /**
         * 异常处理
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            //发生异常,关闭连接
            System.out.println("引擎 {} 的通道发生异常,即将断开连接");
            ctx.close();//再次建议close
        }
     
        /**
         * 心跳机制,超时处理
         *
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            String socketString = ctx.channel().remoteAddress().toString();
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    System.out.println("Client: " + socketString + " READER_IDLE 读超时");
                    ctx.disconnect();//断开
                } else if (event.state() == IdleState.WRITER_IDLE) {
                    System.out.println("Client: " + socketString + " WRITER_IDLE 写超时");
                    ctx.disconnect();
                } else if (event.state() == IdleState.ALL_IDLE) {
                    System.out.println("Client: " + socketString + " ALL_IDLE 总超时");
                    ctx.disconnect();
                }
            }
        }
     
        /**
         * 获取client对象:ip+port
         *
         * @param ctx
         * @return
         */
        public String getRemoteAddress(ChannelHandlerContext ctx) {
            String socketString = "";
            socketString = ctx.channel().remoteAddress().toString();
            return socketString;
        }
     
        /**
         * 获取client的ip
         *
         * @param ctx
         * @return
         */
        public String getIPString(ChannelHandlerContext ctx) {
            String ipString = "";
            String socketString = ctx.channel().remoteAddress().toString();
            int colonAt = socketString.indexOf(":");
            ipString = socketString.substring(1, colonAt);
            return ipString;
        }
     
    }
    
    • TCPServer通道初始化类
    package com.hanwang.config;
    
    import java.nio.charset.Charset;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
     
    /**
     * description: 通道初始化,主要用于设置各种Handler
     **/
    @Component
    public class TCPServerChannelInitializer extends ChannelInitializer<SocketChannel> {
     
        @Autowired
        TCPServerChannelHandler serverChannelHandler;
     
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            ByteBuf delimiter = Unpooled.copiedBuffer(")".getBytes());
            pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,false,delimiter));
            pipeline.addLast(
                    new StringDecoder(Charset.forName("GBK")),
                    new StringEncoder(Charset.forName("GBK"))
                    );
            //字符串编解码器
            //自定义Handler
            pipeline.addLast("serverChannelHandler", serverChannelHandler);
        }
    }
    
    
    • application.properties
      #禁用jmx
    spring.jmx.enabled=false
    #DB
    spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
    spring.datasource.url=
    spring.datasource.username=
    spring.datasource.password=
    spring.datasource.type=com.zaxxer.hikari.HikariDataSource
    spring.datasource.hikari.minimum-idle=15
    spring.datasource.hikari.maximum-pool-size=100
    spring.datasource.hikari.auto-commit=true
    spring.datasource.hikari.idle-timeout=30000
    spring.datasource.hikari.pool-name=DatebookHikariCP
    spring.datasource.hikari.max-lifetime=18000000
    spring.datasource.hikari.connection-timeout=30000
    spring.datasource.hikari.connection-test-query=SELECT 1
    
    #mapper
    mybatis.mapper-locations: classpath:mapper/*.xml
    #sql_log
    logging.level.com.hanwang.mapper=debug
    
    # UDP服务端端口
    port=9924
    # TCP服务端端口
    netty.tcp.server.port=9922
    

    写在最后

    目前实现功能有,同步考勤,人脸信息,定时拉取考勤信息,定时拉取设备人员信息等。如有相同需求,欢迎一起探讨。项目目前停留版本时间为2019.04版本。

    相关文章

      网友评论

          本文标题:使用Netty实现汉王考勤机云平台

          本文链接:https://www.haomeiwen.com/subject/hnnhectx.html