部标监控平台核心模块之一指令操作,网上关于这方面的设计文章基本没有,今天带大家一起来开发,文章里的MQ采用了RabbitMQ。
我们先来看下整个系统的流程图,各模块之间的通信采用MQ交互,例如JT808网关接收终端的位置、图片等信息放到MQ,WEB后台订阅后进行入库,JT809网关订阅后上传给上级平台;WEB后台下发指令通过MQ到JT808网关再到终端,终端应答后再通过MQ返回到WEB后台;WEB后台也可以下发指令操作JT809网关。
image.png
那么如何设计通用的WEB后台与各网关程序的MQ传输协议呢?
- 指令分为两种,同步指令和异步指令。例如客户端下发文本信息给终端,需要等待终端返回成功与否的应答,这种指令属于同步指令,可以设置一个等待超时时间。客户设置了定时拍照的规则,平台按照指定时间自动给终端发送拍照指令,客户端不需要等待终端应答结果,这种指令属于异步指令。
-
终端协议不止部标协议,还有各个厂商的私有协议,组包发给终端的内容肯定不一样。我们看下部标协议,各种指令变化的是消息体,利用这点特性我们可以根据指令ID按协议去组好消息体发送给JT808网关,JT808网关再组成完整协议包发给终端。
image.png - 如何精准的把指令发送到对应的网关程序呢?每个网关程序都会命名不同的节点名称,终端上线时会把终端信息带上节点名称缓存到redis,平台发送指令时先去redis查询终端有无上线,如果未上线就不用发指令到MQ了。如果上线了根据节点名称组成的路由规则发送到MQ,网关程序根据节点名称订阅MQ就能收到指令消息了。
我们先定义前端的指令接口,泛型T是各种指令的消息体参数字段的类:
@ApiModel("指令信息")
@Data
public class CommandDTO<T> {
@ApiModelProperty(value = "终端ID", required = true, position = 1)
@NotNull(message = "终端ID不能为空")
private Long terminalId;
@ApiModelProperty(value = "协议类型", required = true, position = 2)
@NotNull(message = "协议类型不能为空")
private ProtocolEnum protocol = ProtocolEnum.JT808;
@ApiModelProperty(value = "下行指令", position = 3)
private String downCommandId;
@ApiModelProperty(value = "指令内容", position = 4)
private Map<String, Object> params;
@ApiModelProperty(value = "请求方式(同步/异步)", position = 5)
private CommandRequestTypeEnum requestType = CommandRequestTypeEnum.SYNC;
@ApiModelProperty(value = "超时时间(毫秒,请求方式为同步时有效)", position = 6)
private Integer responseTimeout;
@ApiModelProperty(value = "指令内容实体", position = 7)
@Valid
private T paramsEntity;
public Class getParamsEntityClass() {
return paramsEntity.getClass();
}
定义各种指令的参数类,每个参数类必须实现IDownCommandService接口,实现buildMessageBody方法,这个方法是根据参数和协议组成消息体内容,以部标文本下发指令为例:
image.png@ApiModel("0x8300文本信息下发参数")
@Data
@DownCommand(messageId = 0x8300, respMessageId = 0x0001, desc = "文本信息下发")
public class Command8300Param implements IDownCommandService {
@NotNull
@ApiModelProperty(value = "标志位", required = true, position = 1)
private List<Integer> flags;
@NotBlank
@ApiModelProperty(value = "文本信息", required = true, position = 2)
private String textMsg;
@Override
public byte[] buildMessageBody() throws Exception {
byte[] msgBodyArr = null;
ByteBuf msgBody = null;
try {
char[] chars = new char[8];
for (int i = 0; i < 8; i++) {
char value = flags.contains(i) ? '1' : '0';
chars[7 - i] = value;
}
int flag = Integer.parseInt(new String(chars), 2);
byte[] textMsgArr = textMsg.getBytes(CommonConstants.DEFAULT_CHARSET_NAME);
msgBody = Unpooled.buffer(textMsgArr.length + 1);
msgBody.writeByte(flag).writeBytes(textMsgArr);
msgBodyArr = msgBody.array();
} catch (Exception e) {
throw e;
} finally {
if (msgBody != null) {
ReferenceCountUtil.release(msgBody);
}
}
return msgBodyArr;
}
}
@DownCommand(messageId = 0x8300, respMessageId = 0x0001, desc = "文本信息下发"),这个注解表明了这个类对应的指令ID和应答指令ID。
定义JT808接口类Jt808CommandController:
@Api(tags = "JT808指令操作")
@RestController
@RequestMapping("/api/v1/monitor/commands/jt808")
@Slf4j
public class Jt808CommandController {
@Autowired
private CommandOperationService commandOperationService;
@ApiOperation("设置终端参数")
@PostMapping("/sendCommand8103")
public ResultDTO<CommandProto> sendCommand8103(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8103Param> commandDTO) {
return sendJt808Command(commandDTO);
}
@ApiOperation("查询终端参数")
@PostMapping("/sendCommand8104")
public ResultDTO<CommandProto> sendCommand8104(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8104Param> commandDTO) {
return sendJt808Command(commandDTO);
}
@ApiOperation("终端控制")
@PostMapping("/sendCommand8105")
public ResultDTO<CommandProto> sendCommand8105(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8105Param> commandDTO) {
return sendJt808Command(commandDTO);
}
@ApiOperation("文本信息下发参数")
@PostMapping("/sendCommand8300")
public ResultDTO<CommandProto> sendCommand8300(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8300Param> commandDTO) {
return sendJt808Command(commandDTO);
}
/**
* 发送JT808指令
* @param commandDTO
* @return
*/
private ResultDTO<CommandProto> sendJt808Command(CommandDTO<?> commandDTO) {
commandDTO.setProtocol(ProtocolEnum.JT808);
CommandProto result = commandOperationService.sendCommandEntity(commandDTO);
return new ResultDTO<>(result);
}
}
所有的指令都通过CommandOperationService处理,里面会判断终端是否上线、指令是否异步以及发送到MQ,如果有需要可以在这个服务里把指令操作入库。
我们看下同步指令的关键代码,利用JDK的CompletableFuture去等待结果,eventListener.register(commandEventKey, future)为什么需要注册事件呢?这是因为同一时间同一终端同一种指令有可能会有多个用户同时操作,WEB后台收到终端应答后就可以把结果返回给这些用户,这种技术是利用spring自带的事件监听和发布:
/**
* 同步方式需要等待终端应答
*
* @param commandDTO
* @param downCommandInfo
* @param paramsJson
* @return
*/
private CommandProto waitForResult(CommandDTO commandDTO, DownCommandInfo downCommandInfo, String paramsJson) {
//注册指令监听事件
CompletableFuture<CommandProto> future = new CompletableFuture<>();
long terminalId = commandDTO.getTerminalId();
int timeout = commandDTO.getResponseTimeout();
String downCommandId = commandDTO.getDownCommandId();
String commandEventKey = GnssUtils.buildCommandEventKey(terminalId, downCommandId);
eventListener.register(commandEventKey, future);
CommandSendResultEnum commandSendResultEnum = CommandSendResultEnum.FAILED;
try {
//等待应答结果
CommandProto result = future.get(timeout, TimeUnit.MILLISECONDS);
log.info("收到指令应答,终端ID:{},指令类型:{},指令参数:{},应答结果:{}", terminalId, downCommandId, paramsJson, result);
return result;
} catch (TimeoutException e) {
commandSendResultEnum = CommandSendResultEnum.TIMEOUT;
log.error("等待指令应答超时,终端ID:{},指令类型:{},指令参数:{},等待时间:{}", terminalId, downCommandId, paramsJson, timeout, e);
} catch (Exception e) {
commandSendResultEnum = CommandSendResultEnum.INTERNAL_SERVER_ERROR;
log.error("等待指令应答异常,终端ID:{},指令类型:{},指令参数:{}", terminalId, downCommandId, paramsJson, e);
}
//注销指令监听事件
eventListener.unregister(commandEventKey, future);
CommandProto result = buildCommandResponse(commandDTO, downCommandInfo, paramsJson, commandSendResultEnum);
return result;
}
spring监听事件代码:
@Component
@Slf4j
public class CommandEventListener {
private ConcurrentHashMap<String, CopyOnWriteArrayList<CompletableFuture<CommandProto>>> subscriberMap = new ConcurrentHashMap<>();
@EventListener
public void onApplicationEvent(CommandEvent event) {
String commandEventKey = event.getCommandEventKey();
CommandProto message = event.getMessage();
subscriberMap.computeIfPresent(commandEventKey, (k, v) -> {
v.forEach(future -> {
future.complete(message);
});
return null;
});
log.info("广播指令事件,commandEventKey:{},message:{}", commandEventKey, message);
}
/**
* 注册指令事件监听
*
* @param commandEventKey
* @param future
*/
public void register(String commandEventKey, CompletableFuture<CommandProto> future) {
subscriberMap.computeIfAbsent(commandEventKey, k -> new CopyOnWriteArrayList<>()).add(future);
log.info("注册指令事件监听,commandEventKey:{},size:{}", commandEventKey, subscriberMap.get(commandEventKey).size());
}
/**
* 注销指令事件监听
*
* @param commandEventKey
* @param future
*/
public void unregister(String commandEventKey, CompletableFuture<CommandProto> future) {
subscriberMap.computeIfPresent(commandEventKey, (k, v) -> {
v.remove(future);
if (v.isEmpty()) {
log.info("删除指令事件监听,commandEventKey:{}", commandEventKey);
return null;
}
log.info("注销指令事件监听,commandEventKey:{},size:{}", commandEventKey, v.size());
return v;
});
}
}
如何接收终端返回的应答消息呢?我们在MQ定义了上行指令通道,通过订阅接收消息,然后通过spring的事件把结果发布,CommandEventListener的onApplicationEvent方法会把结果广播给CompletableFuture,CommandProto result = future.get(timeout, TimeUnit.MILLISECONDS)就能得到结果了。
@Component
@RabbitListener(queues = RabbitConstants.UP_COMMAND_QUEUE)
@Slf4j
public class RabbitUpCommandReceiver {
@Autowired
private EventPublisher eventPublisher;
@RabbitHandler
public void handleUpCommand(CommandProto upCommand, Channel channel, Message message) throws Exception {
log.info("收到上行指令:{}", upCommand);
try {
//异步发送的响应结果
if (upCommand.getRequestType() == CommandRequestTypeEnum.ASYNC) {
return;
}
//同步发送的响应结果
long terminalId = upCommand.getTerminalId();
String downCommandId = upCommand.getDownCommandId();
String commandEventKey = GnssUtils.buildCommandEventKey(terminalId, downCommandId);
eventPublisher.publishCommandEvent(commandEventKey, upCommand);
} catch (Exception e) {
log.error("处理上行指令异常异常{}", upCommand, e);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
以上是整个流程的关键代码,那么MQ的指令传输协议是怎样的呢?CommandProto在这个类里面,我们采用protobuf序列化,性能和传输大小比json要强很多。这个公共包已经开源了,请自行参考https://github.com/gnss-pro/common-project
打开swagger接口文档测试接口:
image.png
总结:基础架构完成后,以后开发每条指令的接口只需要创建对应指令的参数类并实现组包的buildMessageBody方法,在controller添加接口,实现了几分钟就搞定一条指令接口。
项目地址:https://github.com/gnss-pro/gnss-web
官方网站:http://www.gps-pro.cn
image.png
开源地址:https://github.com/gnss-pro
微信:17158638841 或扫描下图
网友评论