如果对你有帮助, 请点击💖喜欢💖鼓励一下我这个臭弟弟
需求描述:
由于项目部署环境网络不太稳定, 可能会导致和MongoDB的连接断开, 期间的呼叫记录会丢失. 需要想办法解决这个问题.
方案思路:
记录先写入文件中, 从文件中读取记录插入到数据库中. 如果数据库连接正常, 正常入库; 如果连接断开的话... 那就等它连接恢复了再插入, 这期间的记录全部写在文件里, 不会丢失.
方案选型:
- 文件选择log4j日志文件, 配置为按天生成日志文件, 如果不是当天的日志文件且已经读取入库完成, 则删除节省磁盘空间
- 读取文件的流选择RandomAccessFile, 可以随机读写文件, 能够从上次读到的地方继续读取文件内容
方案实施:
- 原来代码中直接对数据库的操作[查询, 插入, 更新], 都替换为在日志中写信息的操作. 此处定义了一个实体类转换为json写入到日志文件中, 实体类如下:
public class RecordLogObject {
public static final int SAVE = 0; // 插入, 更新
public static final int QUERY = 1; // 查询
private int type; // 操作类型
private BirdnestCallRecord callRecord; // 数据
public RecordLogObject(int type, BirdnestCallRecord callRecord) {
this.type = type;
this.callRecord = callRecord;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public BirdnestCallRecord getCallRecord() {
return callRecord;
}
public void setCallRecord(BirdnestCallRecord callRecord) {
this.callRecord = callRecord;
}
}
之前的数据库操作是
callRecordRepository.save(callRecord);
替换之后就是
Gson gson = new Gson();
logger.info(gson.toJson(new RecordLogObject(SAVE, callRecord)));
- 设置定时任务, 定时从日志中读取数据插入数据库; 读取的偏移量记录在单独的文件中, 这样即使项目挂掉, 再次启动时也能保证不会重复读不会少读.
定时任务代码如下:
// 每300ms读取呼叫记录日志文件进行入库
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
logReader.process();
} catch (IOException e) {
e.printStackTrace();
}
}
}, 0, 300, TimeUnit.MILLISECONDS);
日志文件的处理类, 代码如下:
public class CallLogReader {
// 日志文件目录
@Value("${callRecord.log.dir}")
private String logDirectory;
// 偏移文件名称
private String offsetFileName = "offset";
// 偏移文件路径
private String offsetFilePath;
// 日志文件前缀
private String logPrefix = "log";
// 非当日日志名称格式
private String fileNameReg = "^" + logPrefix + ".\\d{8}$";
@PostConstruct
private void initLogOffsetPath() {
offsetFilePath = logDirectory + offsetFileName;
}
private Logger logger = LoggerFactory.getLogger(CallLogReader.class);
@Autowired
private MongoTemplate mongoTemplate;
public void process() throws IOException {
String curName;
long curPoint;
// 查看日志目录下是否有日志文件
File[] logFiles = getFiles();
if (logFiles.length < 1) {
// 没有日志文件直接返回
return;
} else if (logFiles.length == 1){
// 直接读取偏移文件
curName = logFiles[0].getName();
File offsetFile = new File(offsetFilePath);
if (!offsetFile.exists()) {
curPoint = 0;
} else {
String[] offsetInfo = getOffset(offsetFilePath);
curPoint = Long.valueOf(offsetInfo[1]);
}
} else {
// 有多个日志文件, 此时一定有偏移文件, 先处理旧的, 再处理新的
File oldestFile = getOldestFile(logFiles);
String[] offset = getOffset(offsetFilePath);
curPoint = Long.valueOf(offset[1]);
if (curPoint < oldestFile.length()) {
// 继续读
curName = oldestFile.getName();
} else {
// 删除最老的日志文件, 选择次最老的日志文件从头读取
oldestFile.delete();
File newOldestFile = getOldestFile(getFiles());
curName = newOldestFile.getName();
curPoint = 0;
}
}
// 从偏移处开始读取记录入库
String curFilePath = logDirectory + curName;
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(curFilePath, "r");
} catch (FileNotFoundException e) {
logger.error("读取文件异常", e);
}
long fileLength = randomAccessFile.length();
if (curPoint < fileLength) {
randomAccessFile.seek(curPoint);
} else {
// 如果文件已读完且不是当日文件, 则删除
randomAccessFile.close();
if (Pattern.matches(fileNameReg, curName)) {
File file = new File(curFilePath);
file.delete();
logger.warn("删除" + curFilePath);
}
return;
}
Gson gson = new Gson();
//得到行
String result = randomAccessFile.readLine();
//如果内容不为空
if (StringUtils.isNotBlank(result)) {
RecordLogObject recordLog = gson.fromJson(result, RecordLogObject.class);
BirdnestCallRecord callRecord = recordLog.getCallRecord();
try {
// 逻辑处理
} catch (Exception e) {
logger.error("处理日志文件异常", e);
randomAccessFile.close();
return;
}
}
logger.info(curName + ", " + curPoint + "---操作成功");
// 更新偏移文件
updateOffset(offsetFilePath, curName, randomAccessFile.getFilePointer());
logger.info(curName + ", " + curPoint + "---更新偏移文件成功");
randomAccessFile.close();
logger.info(curName + ", " + curPoint + "---关闭读取文件流");
}
/**
* 获取日志目录下的所有文件
* @return
*/
private File[] getFiles() {
File dictFile = new File(logDirectory);
return dictFile.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isFile() && pathname.getName().startsWith(logPrefix);
}
});
}
/**
* 从File数组中查找出最早创建的文件, log永远是最新的
* @param files
* @return
*/
private File getOldestFile(File[] files){
if (files.length == 1) {
return files[0];
}
Arrays.sort(files, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return o2.getName().compareTo(o1.getName());
}
});
return files[files.length -2];
}
/**
* 获取偏移信息
* @param offsetFilePath
* @return [文件名, 偏移量]
* @throws IOException
*/
private String[] getOffset(String offsetFilePath) throws IOException {
FileReader fileReader = new FileReader(offsetFilePath);
StringBuilder stringBuilder = new StringBuilder();
char[] bytesChar = new char[100];
fileReader.read(bytesChar);
fileReader.close();
for (char c : bytesChar) {
stringBuilder.append(c);
}
String offset = stringBuilder.toString().trim();
return offset.split(",");
}
/**
* 更新偏移文件
* @param offsetFilePath 偏移文件
* @param curName 当前读取的文件名
* @param filePointer 偏移量
* @throws IOException
*/
private void updateOffset(String offsetFilePath, String curName, long filePointer) throws IOException {
FileWriter fileWriter = new FileWriter(offsetFilePath);
fileWriter.write(curName + "," + filePointer);
fileWriter.flush();
fileWriter.close();
}
}
- 日志配置如下:
<RollingFile name="callLogAppener" fileName="callLog/log"
filePattern="callLog/log.%d{yyyyMMdd}"
append="true">
<!-- 输出格式 -->
<PatternLayout
pattern="%msg%n" />
<!-- 设置策略 -->
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true" />
</Policies>
</RollingFile>
<logger name="com.bjnangle.birdnest.engine.functions.CallRecordHandler">
<AppenderRef ref="callLogAppener"/>
</logger>
方案效果
数据先记录到日志中;
数据库连接断开后重连可以实现记录入库;
读取完的过时日志可以正常删除.
如果对你有帮助, 请点击💖喜欢💖鼓励一下我这个臭弟弟
网友评论