开发场景
由于公司需要使用汉王考勤机,并需要将考勤内容信息存储至数据库,并提供接口给其他系统平台使用。
汉王官方提供的JavaApi为多线程+io流处理,考勤机数量多时会出现数据混乱现象,所以有此想法改为Netty实现。
开发所用技术
SpringBoot2.1.4+netty+mysql5.7+Quartz+Freemarker(代码生成器使用)
开发实现
由于汉王考勤机相当于客户端,所以后台只需要实现服务端即可。一切代码遵循汉王官网文档的协议内容及传输接收方式。
UDP协议
- UPPServer类
package com.hanwang.config;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.Future;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class NettyUDPServer {
private static final EventLoopGroup group = new NioEventLoopGroup(1);
@Autowired
UDPServerChannelInitializer serverChannelInitializer;
@Value("${port}")
private int port;
//监听端口的通道,即server的处理通道
private Channel channel;
/**
* 开启udp server服务
*
* @return
* @throws InterruptedException
*/
public ChannelFuture start() throws InterruptedException{
//启动类
Bootstrap serverBootstrap = new Bootstrap();
serverBootstrap.group(group)//组配置,初始化ServerBootstrap的线程组
.channel(NioDatagramChannel.class)//数据包通道,udp通道类型
//支持广播
.handler(serverChannelInitializer);//通道处理者
//Future:异步任务的生命周期,可用来获取任务结果
ChannelFuture channelFuture1 = serverBootstrap.bind(port).sync();//绑定端口,开启监听,同步等待
if (channelFuture1 != null && channelFuture1.isSuccess()) {
System.out.println("[UDP] server start success, port = {}");
channel = channelFuture1.channel();//获取通道
} else {
channelFuture1.cause().printStackTrace();
}
return channelFuture1;
}
/**
* 停止udp server服务
* 销毁前的拦截
*/
@PreDestroy
public void destroy() {
try {
if (channel != null) {
ChannelFuture await = channel.close().await();
if (!await.isSuccess()) {
}
}
Future<?> future1 = group.shutdownGracefully().await();
if (!future1.isSuccess()) {
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- UPPServer启动类
package com.hanwang.config;
import io.netty.channel.ChannelFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.hanwang.service.KqptCommandService;
import com.hanwang.service.KqptDevicesService;
@Component
public class NettyUDPServerRun implements ApplicationRunner{
@Autowired
NettyUDPServer nettyUdpServer;
@Value("${port}")
private int PORT;
@Autowired
private KqptDevicesService kqptDevicesService;
@Autowired
private KqptCommandService commandService;
@Override
public void run(ApplicationArguments args){
try {
//启动服务端
ChannelFuture start = nettyUdpServer.start();
start.channel();
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
//start.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
}
}
}
- UPDServer通道处理类
package com.hanwang.config;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.hanwang.entity.KqptCommand;
import com.hanwang.entity.KqptDevices;
import com.hanwang.service.KqptCommandService;
import com.hanwang.service.KqptDevicesService;
import com.hanwang.splash.FaceId_Item;
import com.hanwang.utils.DateUtil;
/**
* description: 通道数据输入的处理
**/
@Component
@ChannelHandler.Sharable
public class UDPServerChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Autowired
private KqptDevicesService kqptDevicesService;
@Autowired
private KqptCommandService commandService;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
System.out.println("[UDP] server 收到的消息:" + datagramPacket.content().toString(CharsetUtil.UTF_8));
String message = datagramPacket.content().toString(CharsetUtil.UTF_8);
//获取sbsn
String sbsn = FaceId_Item.GetKeyValue(message, "sn");
//从数据库kqpt_devices中根据sbsn获取设备监控信息
KqptDevices kqptDevices = kqptDevicesService.selectBySbsn(sbsn);
if(kqptDevices!=null){
if(kqptDevices.getStatus()==null || kqptDevices.getStatus()=="" || kqptDevices.getStatus()!="00" || (!kqptDevices.getStatus().equals("00"))){
kqptDevices.setStatus("00");
}
kqptDevices.setLastTime(DateUtil.getNow());
kqptDevicesService.update(kqptDevices);
//是否下发指令
Map<String, Object> map = new HashMap<String, Object>();
map.put("sbSn", sbsn);
map.put("nowTime", DateUtil.getNow());
List<KqptCommand> list = null;
list = commandService.selectWzxByComidOrSbsn(map);
if(list.size()>0){
//向客户端下发指令-告知有未执行的命令
DatagramPacket datagramPacket1 = new DatagramPacket(Unpooled.copiedBuffer("PostRequest()", Charset.forName("GBK")), datagramPacket.sender());
channelHandlerContext.channel().writeAndFlush(datagramPacket1);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
}
- UPDServer通道初始化,用于处理字符串编码
package com.hanwang.config;
import java.nio.charset.Charset;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* description: 通道初始化,主要用于设置各种Handler
**/
@Component
public class UDPServerChannelInitializer extends ChannelInitializer<NioDatagramChannel> {
@Autowired
UDPServerChannelInboundHandler serverChannelHandler;
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
ChannelPipeline pipeline = nioDatagramChannel.pipeline();
pipeline.addLast(new StringDecoder());
//自定义的InboundHandler输入处理者
//pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
//字符串编解码器
pipeline.addLast(
new StringDecoder(CharsetUtil.UTF_8),
new StringEncoder(Charset.forName("GBK"))
);
pipeline.addLast("serverChannelHandler", serverChannelHandler);
}
}
TCP协议
- TCPServer类
package com.hanwang.config;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class NettyTCPServer {
//boss事件轮询线程组
//处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
private EventLoopGroup boss = new NioEventLoopGroup(1);
//worker事件轮询线程组
//处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2
private EventLoopGroup worker = new NioEventLoopGroup();
@Autowired
TCPServerChannelInitializer serverChannelInitializer;
@Value("${netty.tcp.server.port}")
private Integer port;
//与客户端建立连接后得到的通道对象
private Channel channel;
/**
* 存储client的channel
* key:ip,value:Channel
*/
public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();
/**
* 开启Netty tcp server服务
*
* @return
*/
public ChannelFuture start() {
//启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)//组配置,初始化ServerBootstrap的线程组
.channel(NioServerSocketChannel.class)///构造channel通道工厂//bossGroup的通道,只是负责连接
.childHandler(serverChannelInitializer)//设置通道处理者ChannelHandler////workerGroup的处理器
.option(ChannelOption.SO_BACKLOG, 1024)//socket参数,当服务器请求处理程全满时,用于临时存放已完成三次握手请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制,tcp,默认2小时发一次心跳
//Future:异步任务的生命周期,可用来获取任务结果
ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待
if (channelFuture1 != null && channelFuture1.isSuccess()) {
channel = channelFuture1.channel();//获取通道
} else {
}
return channelFuture1;
}
/**
* 停止Netty tcp server服务
*/
@PreDestroy
public void destroy() {
if (channel != null) {
channel.close();
}
try {
Future<?> future = worker.shutdownGracefully().await();
if (!future.isSuccess()) {
}
Future<?> future1 = boss.shutdownGracefully().await();
if (!future1.isSuccess()) {
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- TCPServer启动类
package com.hanwang.config;
import io.netty.channel.ChannelFuture;
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class NettyTCPServerRun implements ApplicationRunner {
@Autowired
NettyTCPServer nettyTcpServer;
@Override
public void run(ApplicationArguments args) throws IOException {
try {
//启动服务端
ChannelFuture start = nettyTcpServer.start();
start.channel();
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
//start.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- TCPServer通道类
package com.hanwang.config;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.hanwang.entity.KqptCommand;
import com.hanwang.entity.KqptDevices;
import com.hanwang.entity.KqptEmployee;
import com.hanwang.entity.KqptKqrecords;
import com.hanwang.service.KqptCommandService;
import com.hanwang.service.KqptDevicesService;
import com.hanwang.service.KqptEmployeeService;
import com.hanwang.service.KqptKqrecordsService;
import com.hanwang.splash.FaceId_Item;
import com.hanwang.utils.DateUtil;
import com.hanwang.utils.StringUtils;
@Component
@ChannelHandler.Sharable
public class TCPServerChannelHandler extends SimpleChannelInboundHandler<String> {
// 设备号
private static String serialNumber = null;
// 设备号
private static String GetEmployeeIDserialNumber = null;
// 当前执行命令ID
private static Integer commandId = 0;
// 工号
private static String empNo = null;
// 姓名
private static String name = null;
@Autowired
private KqptKqrecordsService kqptKqrecordsService;
@Autowired
private KqptCommandService kqptCommandService;
@Autowired
private KqptEmployeeService kqptEmployeeService;
@Autowired
private KqptDevicesService kqptDevicesService;
/**
* 拿到传过来的msg数据,开始处理
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msgStr) throws Exception {
System.out.println("Netty tcp server receive msg : " + msgStr);
try{
if(msgStr.startsWith("PostRecord"))
{
// 答复已准备好接收考勤记录
if (true)
{
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\" postphoto=\"false\")"
).toString().toString(), Charset.forName("GBK"));
ctx.channel().writeAndFlush(buf);
}
}
else if(msgStr.startsWith("Record"))
{
// 服务器回应
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
).toString().toString(), Charset.forName("GBK"));
ctx.channel().writeAndFlush(buf);
}
else if(msgStr.startsWith("PostEmployee"))
{ // 准备上传人员信息
// 服务器回应
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
).toString().toString(), Charset.forName("GBK"));
ctx.channel().writeAndFlush(buf);
}
else if(msgStr.startsWith("Employee"))
{ // 读取人员信息
// 服务器回应
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
).toString().toString(), Charset.forName("GBK"));
ctx.channel().writeAndFlush(buf);
}
//获取设备信息-GetDeviceInfo()
else if(msgStr.startsWith("Return(result=\"success\" time=")){
KqptDevices kqptDevices = new KqptDevices();
kqptDevices.setSbSn(FaceId_Item.GetKeyValue(msgStr, "dev_id"));
kqptDevices.setMsg(msgStr);
kqptDevices.setZcNum(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "real_faceregist")));
kqptDevices.setSbNum(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "max_faceregist")));
kqptDevicesService.update(kqptDevices);
}
// 下发命令
else if (msgStr.startsWith("GetRequest"))
{
//获取sbsn
serialNumber = FaceId_Item.GetKeyValue(msgStr, "sn");
Map<String, Object> map = new HashMap<String, Object>();
map.put("sbSn", serialNumber);
map.put("nowTime", DateUtil.getNow());
List<KqptCommand> list = kqptCommandService.selectWzxByComidOrSbsn(map);
if(list.size()>0){
try {
commandId = list.get(0).getId();
if(commandId!=null && StringUtils.isNotEmpty(list.get(0).getComNr())){
if(list.get(0).getComNr().startsWith("AddNameTable")){
empNo = StringUtils.subString(list.get(0).getComNr(), "(", "=");
name = StringUtils.subString(list.get(0).getComNr(), "=\"", "\")");
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
).toString().toString(), Charset.forName("GBK"));
ChannelFuture cf = ctx.channel().writeAndFlush(buf);
//添加ChannelFutureListener以便在写操作完成后接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if(future.isSuccess()){
//删除对应用户信息
kqptEmployeeService.delBySbsn(serialNumber,empNo);
//处理从考勤机获取到的AddNameTable
KqptEmployee emp = new KqptEmployee();
emp.setEmpNo(empNo);
emp.setSbSn(serialNumber);
emp.setName(name);
emp.setInTime(DateUtil.getNow());
kqptEmployeeService.save(emp);
//命令执行成功
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("2");
kqptCommand.setResult("success");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("GetDeviceInfo()"
).toString().toString(), Charset.forName("GBK"));
ChannelFuture cf = ctx.channel().writeAndFlush(buf);
}else{
//命令执行失败
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("3");
kqptCommand.setResult("fail");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
}
}
});
}else if(list.get(0).getComNr().startsWith("DeleteEmployee")){
empNo = FaceId_Item.GetKeyValue(list.get(0).getComNr(), "id");
/* //获取链接实例
Channel channel = ctx.channel();
//创建一个持有数据的ByteBuf
//获取设备信息 GetDeviceInfo() 并存入数据库
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
).toString().toString(), Charset.forName("GBK"));*/
//数据冲刷
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
).toString().toString(), Charset.forName("GBK"));
ChannelFuture cf = ctx.channel().writeAndFlush(buf);
//添加ChannelFutureListener以便在写操作完成后接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if(future.isSuccess()){
kqptEmployeeService.delBySbsn(serialNumber,empNo);
//命令执行成功
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("2");
kqptCommand.setResult("success");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("GetDeviceInfo()"
).toString().toString(), Charset.forName("GBK"));
ChannelFuture cf = ctx.channel().writeAndFlush(buf);
}else{
//命令执行失败
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("3");
kqptCommand.setResult("fail");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
}
}
});
}else if(list.get(0).getComNr().startsWith("GetEmployeeID")){
GetEmployeeIDserialNumber = serialNumber;
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
).toString().toString(), Charset.forName("GBK"));
ChannelFuture cf = ctx.channel().writeAndFlush(buf);
//添加ChannelFutureListener以便在写操作完成后接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if(future.isSuccess()){
//命令执行成功
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("2");
kqptCommand.setResult("success");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
}else{
//命令执行失败
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("3");
kqptCommand.setResult("fail");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
}
}
});
}else{
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
).toString().toString(), Charset.forName("GBK"));
ChannelFuture cf = ctx.channel().writeAndFlush(buf);
//添加ChannelFutureListener以便在写操作完成后接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if(future.isSuccess()){
//命令执行成功
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("2");
kqptCommand.setResult("success");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
}else{
//命令执行失败
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("3");
kqptCommand.setResult("fail");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
}
}
});
}
}else{
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
).toString().toString(), Charset.forName("GBK"));
//数据冲刷
ctx.channel().writeAndFlush(buf);
}
}catch (Exception e) {
}
}else{
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
).toString().toString(), Charset.forName("GBK"));
//数据冲刷
ctx.channel().writeAndFlush(buf);
}
}
//保存考勤记录到数据库
else if(msgStr.startsWith("Return(result=\"success\" dev_id=\"")){
//暂存时间用于数据上传时间
String nowTime = DateUtil.getNow();
if(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "total"))>0){
// 提取单条考勤记录
List<KqptKqrecords> kqptKqrecordsList = new LinkedList<KqptKqrecords>();
Pattern p = Pattern.compile("\\b(time=.+\\R(?:photo=\"[^\"]+\")*)");
Matcher m = p.matcher(msgStr);
String devId = FaceId_Item.GetKeyValue(msgStr, "dev_id");
while(m.find())
{
KqptKqrecords kqptKqrecords = new KqptKqrecords();
kqptKqrecords.setSbSn(devId);
kqptKqrecords.setEmpNo(FaceId_Item.GetKeyValue(m.group(1), "id"));
kqptKqrecords.setKqTime(FaceId_Item.GetKeyValue(m.group(1), "time"));
kqptKqrecords.setInTime(nowTime);
kqptKqrecords.setTbFlag("0");
kqptKqrecordsList.add(kqptKqrecords);
}
kqptKqrecordsService.batchSave(kqptKqrecordsList);
System.out.println("保存考勤信息成功");
}
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
).toString().toString(), Charset.forName("GBK"));
//数据冲刷
ctx.channel().writeAndFlush(buf);
}
//获取设备发起GetEmployeeID()指令
else if(msgStr.startsWith("Return(result=\"success\" total=\"")){
if(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "total"))>0){
FaceId_Item[] ItemCollection = FaceId_Item.GetAllItems(msgStr);
if (ItemCollection != null)
{
for (FaceId_Item item : ItemCollection)
{
if (item.name.equals("id"))
{
KqptCommand command = new KqptCommand();
command.setSbSn(GetEmployeeIDserialNumber);
command.setComNr(new StringBuilder().append("GetEmployee(id=\"").append(item.value).append("\")").toString());
command.setStatus("0");
command.setCreateTime(DateUtil.getNow());
kqptCommandService.save(command);
}
}
}
}else{
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
).toString().toString(), Charset.forName("GBK"));
//数据冲刷
ctx.channel().writeAndFlush(buf);
}
}
else if(msgStr.startsWith("Return(result=\"success\" id=\"")){
kqptEmployeeService.delBySbsn(serialNumber,FaceId_Item.GetKeyValue(msgStr, "id"));
//处理从考勤机获取到的人员信息
KqptEmployee employee = new KqptEmployee();
employee.setEmpNo(FaceId_Item.GetKeyValue(msgStr, "id"));
employee.setSbSn(FaceId_Item.GetKeyValue(msgStr, "sn"));
employee.setName(FaceId_Item.GetKeyValue(msgStr, "name"));
employee.setAlgEdition(FaceId_Item.GetKeyValue(msgStr, "alg_edition"));
employee.setCheckType(FaceId_Item.GetKeyValue(msgStr, "check_type"));
employee.setFaceData(StringUtils.subString(msgStr, "face_data"));
employee.setInTime(DateUtil.getNow());
kqptEmployeeService.save(employee);
System.out.println("人员信息保存成功"+FaceId_Item.GetKeyValue(msgStr, "name"));
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
).toString().toString(), Charset.forName("GBK"));
//数据冲刷
ctx.channel().writeAndFlush(buf);
}
else if(msgStr.startsWith("Return(result=\"failed\"")){
//命令执行失败
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("3");
kqptCommand.setResult("fail");
kqptCommand.setLastTime(DateUtil.getNow());
kqptCommandService.update(kqptCommand);
commandId = null;
}
ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
).toString().toString(), Charset.forName("GBK"));
//数据冲刷
ctx.channel().writeAndFlush(buf);
}
else if(msgStr.startsWith("Quit")){ // 结束会话
}
}
catch (Exception ex)
{
//命令执行失败
if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
KqptCommand kqptCommand = new KqptCommand();
kqptCommand.setId(commandId);
kqptCommand.setStatus("3");
kqptCommand.setResult("fail");
kqptCommand.setLastTime(DateUtil.getNow());
commandId = null;
kqptCommandService.update(kqptCommand);
}
}
}
/**
* 活跃的、有效的通道
* 第一次连接成功后进入的方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("tcp client " + getRemoteAddress(ctx) + " connect success");
//往channel map中添加channel信息
NettyTCPServer.map.put(getIPString(ctx), ctx.channel());
}
/**
* 不活动的通道
* 连接丢失后执行的方法(client端可据此实现断线重连)
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//删除Channel Map中的失效Client
NettyTCPServer.map.remove(getIPString(ctx));
ctx.close();
}
/**
* 异常处理
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
//发生异常,关闭连接
System.out.println("引擎 {} 的通道发生异常,即将断开连接");
ctx.close();//再次建议close
}
/**
* 心跳机制,超时处理
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("Client: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();//断开
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("Client: " + socketString + " WRITER_IDLE 写超时");
ctx.disconnect();
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("Client: " + socketString + " ALL_IDLE 总超时");
ctx.disconnect();
}
}
}
/**
* 获取client对象:ip+port
*
* @param ctx
* @return
*/
public String getRemoteAddress(ChannelHandlerContext ctx) {
String socketString = "";
socketString = ctx.channel().remoteAddress().toString();
return socketString;
}
/**
* 获取client的ip
*
* @param ctx
* @return
*/
public String getIPString(ChannelHandlerContext ctx) {
String ipString = "";
String socketString = ctx.channel().remoteAddress().toString();
int colonAt = socketString.indexOf(":");
ipString = socketString.substring(1, colonAt);
return ipString;
}
}
- TCPServer通道初始化类
package com.hanwang.config;
import java.nio.charset.Charset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* description: 通道初始化,主要用于设置各种Handler
**/
@Component
public class TCPServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
TCPServerChannelHandler serverChannelHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer(")".getBytes());
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,false,delimiter));
pipeline.addLast(
new StringDecoder(Charset.forName("GBK")),
new StringEncoder(Charset.forName("GBK"))
);
//字符串编解码器
//自定义Handler
pipeline.addLast("serverChannelHandler", serverChannelHandler);
}
}
- application.properties
#禁用jmx
spring.jmx.enabled=false
#DB
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=
spring.datasource.username=
spring.datasource.password=
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=15
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.max-lifetime=18000000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
#mapper
mybatis.mapper-locations: classpath:mapper/*.xml
#sql_log
logging.level.com.hanwang.mapper=debug
# UDP服务端端口
port=9924
# TCP服务端端口
netty.tcp.server.port=9922
写在最后
目前实现功能有,同步考勤,人脸信息,定时拉取考勤信息,定时拉取设备人员信息等。如有相同需求,欢迎一起探讨。项目目前停留版本时间为2019.04版本。
网友评论