方式主要是设置同步锁。注意点: serverChannelHandlerAdapter.setUuid(dataSourceKey) 与通讯实体的UUID是同一个UUID,才能回调到latch.await的if语句里边。
// 查询该数据源该schema下所有的表
FtpPathFileSource dataSource = new FtpPathFileSource();
dataSource.setUserName(dataSourceDto.getUserName());
dataSource.setPassword(dataSourceDto.getPassword());
dataSource.setUrl(dataSourceDto.getDataSourceIp());
dataSource.setPath(dataConfigSearchForm.getSchema());
dataSource.setBusinessType(BusinessTypeEnum.FTP);
List<Client> usableClientList = datasourceClientRepository.findUsableClientList();
Random random = new Random();
//从可用的客户端列表中随机获取一个客户端
Client client = usableClientList.get(random.nextInt(usableClientList.size()));
//获取channel
Channel channel = DataBaseCache.CHANNEL_MAP.get(client.getChannelId());
//设置同步锁
CountDownLatch latch = new CountDownLatch(Constant.Field.IS_SYNCHRO);
ServerChannelHandlerAdapter serverChannelHandlerAdapter = (ServerChannelHandlerAdapter) channel.pipeline().get("ChannelHandlerAdapter");
serverChannelHandlerAdapter.resetSync(latch, true);
String dataSourceKey = UUID.randomUUID().toString();
serverChannelHandlerAdapter.setUuid(dataSourceKey);
//构造通讯实体
TransferMessage transferMessage = new TransferMessage();
transferMessage.setId(dataSourceKey);
transferMessage.setBusinessType(BusinessTypeEnum.FTP);
transferMessage.setCommandType(CommandTypeEnum.CONTENT);
transferMessage.setVersion(Constant.Field.CLIENT_VERSION);
transferMessage.setData(JsonUtil.objectToJson(dataSource));
channel.writeAndFlush(JsonUtil.objectToJson(transferMessage));
FtpFilePathDto dataResult = new FtpFilePathDto();
try {
//同步返回结果
if (latch.await(Constant.Field.SECONDS_TIMEOUT, TimeUnit.SECONDS)) {
transferMessage = serverChannelHandlerAdapter.getResult();
dataResult = JsonUtil.jsonToObject(transferMessage.getData(), FtpFilePathDto.class);
} else {
log.error("测试连接超时");
}
} catch (Exception e) {
log.error("show table data timeout", e);
}
网友评论