背景
刚接手新项目,该项目是高并发的游戏日志服务端存储,一个项目适配多个游戏,很多特殊需求要兼容,刚开始接手,需要修复很多管道的数据,存储管道有两个,分别是MySQL和HDFS,数据消费自Kafka,从Kafka拉数据后,备份到本地,线上数据异常,从备份恢复数据,这套流程已经被设计好,玩的很6。
修复一个时间跨度大的备份时,经常报解析数据异常,排查后发现有些数据写乱了。
备份按行分割,每行数据用json存储,有的行json被截取了,导致无法解析,数据错误概率大概为万分之二三。
埋点抓取异常数据
发现备份数据错乱后,捕获了异常,将错误json,写到一个单独的文件中。
捕获json解析失败的行数据,发现行数据如下:
516"}
"}
}
437484_1193"}
524"}
_5758"}
5_9103487_8283"}
}
6276"}
_2528"}
8_7289"}
2"}
4"}
075_2467"}
"}
"}
029591_4539"}
很明显,json无法解析这些行数据,进一步分析发现,这些数据是前些行没写完的数据。
数据写乱了,最直观的想法是,线程同步问题。
源代码分析
下面贴出多线程并发写文件的代码:
public static void writeFileLock(String content, String filePath) {
File file = new File(filePath);
RandomAccessFile fout = null;
FileChannel fcout = null;
try {
fout = new RandomAccessFile(file, "rw");
fcout = fout.getChannel();//打开文件通道
FileLock flout = null;
while (true) {
try {
flout = fcout.tryLock();//不断的请求锁,如果请求不到,等一秒再请求
break;
} catch (Exception e) {
log.debug("等待锁" + filePath, "数据:" + content);
Thread.sleep(30);
}
}
long filelength = fout.length();//获取文件的长度
fout.seek(filelength);//将文件的读写指针定位到文件的末尾
fout.write(content.getBytes());//将需要写入的内容写入文件
flout.release();
fcout.close();
fout.close();
} catch (IOException e) {
e.printStackTrace();
log.error("file no find ..." + filePath, "数据:" + content);
} catch (InterruptedException e1) {
log.error("写入异常文件:" + filePath, "数据:" + content);
e1.printStackTrace();
} finally {
if (fcout != null) {
try {
fcout.close();
} catch (IOException e) {
e.printStackTrace();
fcout = null;
}
}
if (fout != null) {
try {
fout.close();
} catch (IOException e) {
e.printStackTrace();
fout = null;
}
}
}
}
仔细研究tryLock的代码发现三个特性:
1、tryLock非阻塞,调用后立刻返回。
2、tryLock()无参方法,实际调用有参方法:tryLock(0L, Long.MAX_VALUE, false),前两个参数表示锁住整个文件,第三个参数表示独占所。
3、tryLock()方法调用后,有3中可能,第一、拿到文件索返回对象;第二、没有拿到文件索返回null;第三、抛出异常。
很显然,上述代码没有经过斟酌,忽略了文件锁可能返回null,导致劝告锁失效,多线程没有卡住,逃出while循环,这个在本地环境复现,测试中发现抛出异常的概率很大,因此做出以下两点优化。
优化
第一、文件所判空。
第二、巧用Thread.sleep(0),让没有拿到锁的线程重新回到可执行队列,等待cpu的时间分片切换。
修改后:
public static void writeFileLock(String content, String filePath) {
File file = new File(filePath);
RandomAccessFile fout = null;
FileChannel fcout = null;
FileLock flout = null;
try {
fout = new RandomAccessFile(file, "rw");
fcout = fout.getChannel();//打开文件通道
while (true) {
try {
flout = fcout.tryLock();
if (flout != null) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
long filelength = fout.length();//获取文件的长度
fout.seek(filelength);//将文件的读写指针定位到文件的末尾
fout.write(content.getBytes());//将需要写入的内容写入文件
flout.release();
fcout.close();
fout.close();
} catch (Exception e) {
log.error("写文件异常", e);
log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
} finally {
try {
if (flout != null && flout.isValid()) {
flout.release();
}
if (fcout != null) {
fcout.close();
}
if (fout != null) {
fout.close();
}
} catch (Exception e) {
log.error("关闭文件流异常", e);
}
}
}
优化后,测试数据不再错乱。
总结
网友评论