1.话不多说,先提问题
topic : 项目完成,新需求正在出,闲娱时间,看了看seata分布式源码,版本是0.9.0,主要分析下Store这个模块,和更新迭代源码所做出的改造。首先明白存储层的实现对于 Seata 是否高性能,是否可靠非常关键。
- 如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。
- 在 Seata 中默认提供了文件方式的存储,另一种是基于 DB 的,下面我们定义我们存储的数据为 Session,而我们的TM创造的全局事务数据叫 GlobalSession,RM 创造的分支事务叫 BranchSession,一个 GlobalSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。
File
基于文件的实现是 FileTransactionStoreManager, 它可以使用同步刷盘或异步刷盘的策略,每当有 Session 的状态的更新时,它都会将变化的内容存储起来。为了防止存储文件的无限增殖,当达到一定条件时,它会另打开一个文件从头开始记录,并将之前的文件保存起来。该方案既能保证所有超时事务不丢,只有已完成的事务被清除,同时文件的大小也得到了控制。我们会结合代码来介绍 Seata 是如何做到的。
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
// 锁保证线程安全
this.writeSessionLock.lock();
long curFileTrxNum;
label71: {
boolean var5;
try {
// 实际的写数据过程,将编码后的比特数组写入 FileChannel
if (!this.writeDataFile((new TransactionWriteStore(session, logOperation)).encode())) {
var5 = false;
return var5;
}
this.lastModifiedTime = System.currentTimeMillis();
curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
// 如果当前事务存储文件已经累计记录一定数量的事务,并用且该文件使时间达标,则进行当前文件的保存和新文件的创建
if (curFileTrxNum % (long)PER_FILE_BLOCK_SIZE != 0L || System.currentTimeMillis() - trxStartTimeMills <= MAX_TRX_TIMEOUT_MILLS) {
break label71;
}
var5 = this.saveHistory();
} catch (Exception var10) {
LOGGER.error("writeSession error, {}", var10.getMessage(), var10);
boolean var6 = false;
return var6;
} finally {
this.writeSessionLock.unlock();
}
return var5;
}
this.flushDisk(curFileTrxNum, this.currFileChannel);
return true;
}
其中就 3 个关键函数,writeDataFile,saveHistory,flushDisk。writeDataFile 的实现很简单,就是同步写入 FileChannel。接下来看一下比较重要的 saveHistory。我们分别介绍一下它们。
private boolean saveHistory() throws IOException {
boolean result;
try {
result = this.findTimeoutAndSave();
this.writeDataFileRunnable.putRequest(new FileTransactionStoreManager.CloseFileRequest(this.currFileChannel, this.currRaf));
Files.move(this.currDataFile.toPath(), (new File(this.hisFullFileName)).toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException var6) {
LOGGER.error("save history data file error, {}", var6.getMessage(), var6);
result = false;
} finally {
this.initFile(this.currFullFileName);
}
return result;
}
这个方法,我们先分析一下之前版本的写法,简单分析下,就是根据总事务找分支事务,用的是for嵌套,0.9.0版本做了一个优化,用的迭代来遍历,提高了一些效率,大家平时也可参照。
// 找到所有超时的 Session 存储起来
private boolean findTimeoutAndSave() throws IOException {
List<GlobalSession> globalSessionsOverMaxTimeout =
sessionManager.findGlobalSessions(new SessionCondition(MAX_TRX_TIMEOUT_MILLS));
if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) {
return true;
}
List<byte[]> listBytes = new ArrayList<>();
int totalSize = 0;
// 1. find all data and merge
for (GlobalSession globalSession : globalSessionsOverMaxTimeout) {
TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD);
byte[] data = globalWriteStore.encode();
listBytes.add(data);
totalSize += data.length + INT_BYTE_SIZE;
List<BranchSession> branchSessIonsOverMaXTimeout = globalSession.getSortedBranches();
if (null != branchSessIonsOverMaXTimeout) {
for (BranchSession branchSession : branchSessIonsOverMaXTimeout) {
TransactionWriteStore branchWriteStore =
new TransactionWriteStore(branchSession, LogOperation.BRANCH_ADD);
data = branchWriteStore.encode();
listBytes.add(data);
totalSize += data.length + INT_BYTE_SIZE;
}
}
}
// 2. batch write
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(totalSize);
for (byte[] bytes : listBytes) {
byteBuffer.putInt(bytes.length);
byteBuffer.put(bytes);
}
if (writeDataFileByBuffer(byteBuffer)) {
currFileChannel.force(false);
return true;
}
return false;
}
private boolean findTimeoutAndSave() throws IOException {
List<GlobalSession> globalSessionsOverMaxTimeout = this.sessionManager.findGlobalSessions(new SessionCondition(MAX_TRX_TIMEOUT_MILLS));
if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) {
return true;
} else {
Iterator var2 = globalSessionsOverMaxTimeout.iterator();
while(true) {
byte[] data;
ArrayList branchSessIonsOverMaXTimeout;
do {
if (!var2.hasNext()) {
if (this.flushWriteBuffer(this.writeBuffer)) {
this.currFileChannel.force(false);
return true;
}
return false;
}
GlobalSession globalSession = (GlobalSession)var2.next();
TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD);
data = globalWriteStore.encode();
if (!this.writeDataFrame(data)) {
return false;
}
branchSessIonsOverMaXTimeout = globalSession.getSortedBranches();
} while(null == branchSessIonsOverMaXTimeout);
Iterator var7 = branchSessIonsOverMaXTimeout.iterator();
while(var7.hasNext()) {
BranchSession branchSession = (BranchSession)var7.next();
TransactionWriteStore branchWriteStore = new TransactionWriteStore(branchSession, LogOperation.BRANCH_ADD);
data = branchWriteStore.encode();
if (!this.writeDataFrame(data)) {
return false;
}
}
}
}
}
现在我们知道 Seata 同时最多有2个存储文件,一个是 currentDataFile 一个是 historyFullFile,currentDataFile 存储了最新的数据,而 historyFullFile 相较于 currentDataFile,还存储了之前过期的所有 Session。任何时候,如果 TC 宕机,重启时只要先读取 historyFullFile,再读取 currentDataFile 就能恢复所有数据。
替换 historyFullFile 时,因为会将所有超时的 Session 信息先写入 currentDataFile,然后才会将 currentDataFile 改名为 historyFullFile 并替换掉之前的 oldHistoryFullFile,这样所有过期 Session 就被延续下去了,实际上 Session 过期时间和新建 currentDataFile 的时间是一致的,都是 30 分钟,这样再进行 historyFullFile 替换时,之前的 oldHistoryFullFile 实际上只会存在超时 Session 和完成的 Session,所有超时 Session 已经被记录在新的 historyFullFile 中了,而完成的 Session 会在替换时,随着 oldHistoryFullFile 一起被删除。
private void flushDisk(long curFileNum, FileChannel currFileChannel) {
if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
FileTransactionStoreManager.SyncFlushRequest syncFlushRequest = new FileTransactionStoreManager.SyncFlushRequest(curFileNum, currFileChannel);
this.writeDataFileRunnable.putRequest(syncFlushRequest);
syncFlushRequest.waitForFlush(2000L);
} else {
this.writeDataFileRunnable.putRequest(new FileTransactionStoreManager.AsyncFlushRequest(curFileNum, currFileChannel));
}
}
最后刷盘的过程也很简单。根据配置,如果是同步刷盘会用 Future#get 阻塞等待,否则异步进行,writeDataFileRunnable 内部有一个阻塞队列,会有一个线程循环从中提取任务并执行,应该不难理解吧。贴上代码,是一个FileTransactionStoreManager内部类,启动了一个线程来执行。
class WriteDataFileRunnable implements Runnable {
private LinkedBlockingQueue<FileTransactionStoreManager.StoreRequest> storeRequests = new LinkedBlockingQueue();
WriteDataFileRunnable() {
}
// 调用的就是此方法,阻塞队列是LinkedBlockingQueue,这个队列是一个无界队
//列,在初始化的时候需要指定大小(默认是INTEGER_MAX_SIZE),如果存在
//添加速度大于删除速度时候,有可能会内存溢出,个人推荐Disruptor队列
public void putRequest(FileTransactionStoreManager.StoreRequest request) {
this.storeRequests.add(request);
}
public void run() {
while(!FileTransactionStoreManager.this.stopping) {
try {
FileTransactionStoreManager.StoreRequest storeRequest = (FileTransactionStoreManager.StoreRequest)this.storeRequests.poll(2000L, TimeUnit.MILLISECONDS);
this.handleStoreRequest(storeRequest);
} catch (Exception var2) {
FileTransactionStoreManager.LOGGER.error("write file error: {}", var2.getMessage(), var2);
}
}
this.handleRestRequest();
}
private void handleRestRequest() {
int remainNums = this.storeRequests.size();
for(int i = 0; i < remainNums; ++i) {
this.handleStoreRequest((FileTransactionStoreManager.StoreRequest)this.storeRequests.poll());
}
}
private void handleStoreRequest(FileTransactionStoreManager.StoreRequest storeRequest) {
if (storeRequest == null) {
this.flushOnCondition(FileTransactionStoreManager.this.currFileChannel);
}
if (storeRequest instanceof FileTransactionStoreManager.SyncFlushRequest) {
this.syncFlush((FileTransactionStoreManager.SyncFlushRequest)storeRequest);
} else if (storeRequest instanceof FileTransactionStoreManager.AsyncFlushRequest) {
this.async((FileTransactionStoreManager.AsyncFlushRequest)storeRequest);
} else if (storeRequest instanceof FileTransactionStoreManager.CloseFileRequest) {
this.closeAndFlush((FileTransactionStoreManager.CloseFileRequest)storeRequest);
}
}
private void closeAndFlush(FileTransactionStoreManager.CloseFileRequest req) {
long diff = FileTransactionStoreManager.FILE_TRX_NUM.get() - FileTransactionStoreManager.FILE_FLUSH_NUM.get();
this.flush(req.getFileChannel());
FileTransactionStoreManager.FILE_FLUSH_NUM.addAndGet(diff);
FileTransactionStoreManager.this.closeFile(FileTransactionStoreManager.this.currRaf);
}
private void async(FileTransactionStoreManager.AsyncFlushRequest req) {
if (req.getCurFileTrxNum() < FileTransactionStoreManager.FILE_FLUSH_NUM.get()) {
this.flushOnCondition(req.getCurFileChannel());
}
}
private void syncFlush(FileTransactionStoreManager.SyncFlushRequest req) {
if (req.getCurFileTrxNum() < FileTransactionStoreManager.FILE_FLUSH_NUM.get()) {
long diff = FileTransactionStoreManager.FILE_TRX_NUM.get() - FileTransactionStoreManager.FILE_FLUSH_NUM.get();
this.flush(req.getCurFileChannel());
FileTransactionStoreManager.FILE_FLUSH_NUM.addAndGet(diff);
}
req.wakeupCustomer();
}
private void flushOnCondition(FileChannel fileChannel) {
if (FileTransactionStoreManager.FLUSH_DISK_MODE != FlushDiskMode.SYNC_MODEL) {
long diff = FileTransactionStoreManager.FILE_TRX_NUM.get() - FileTransactionStoreManager.FILE_FLUSH_NUM.get();
if (diff != 0L) {
if (diff % 10L == 0L || System.currentTimeMillis() - FileTransactionStoreManager.this.lastModifiedTime > 2000L) {
this.flush(fileChannel);
FileTransactionStoreManager.FILE_FLUSH_NUM.addAndGet(diff);
}
}
}
}
private void flush(FileChannel fileChannel) {
try {
fileChannel.force(false);
} catch (IOException var3) {
FileTransactionStoreManager.LOGGER.error("flush error:" + var3.getMessage());
}
}
}
DB
接下来,我们看一下基于 DB 的实现。
实际上就是一个 DAO 层的接口,对应了数据的 CRUD,在重启恢复时只不过是按照条件遍历 DB 中的所有数据,进行 Session 恢复。
简单总结一下:同步写,异步刷盘
IMG_2423.JPG`
网友评论