本系列主要关注安卓数据库的线程行为,分为四个部分:
(1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
(2)SQLiteDatabase的实现以及多线程行为
(3)连接缓存池SQLiteConnectionPool
(4)SQLiteDatabase多线程实践
本篇主要关注SQLiteDatabase
的线程同步实现与架构实现。
1 SQLiteClosable的acquireReference与releaseReference方法
SQLiteClosable
是SQLiteDatabase
的父类,也同时是数据库下其他几个类的父类。其中实现了引用计数逻辑来控制资源释放的时机。
private int mReferenceCount = 1;
public void acquireReference() {
synchronized(this) {
if (mReferenceCount <= 0) {
throw new IllegalStateException(
"attempt to re-open an already-closed object: " + this);
}
mReferenceCount++;
}
}
public void releaseReference() {
boolean refCountIsZero = false;
synchronized(this) {
refCountIsZero = --mReferenceCount == 0;
}
if (refCountIsZero) {
onAllReferencesReleased();
}
}
可以看到这里用mReferenceCount
简单地实现了一个引用计数。而引用计数的初始值是1。SQLiteDatabase
会在每次操作前调用一次acquireReference
,而在结束后调用一次releaseReference
。为了方便,下文中把这样的被acquireReference
和releaseReference
包裹的过程称为一次“操作”。
那么如果这两个方法保持成对调用的话,是不是就不可能触发onAllReferenceReleased
方法?事实上,SQLiteClosable
还有一个方法close
调用了releaseReference
。由于锁的存在,只要不在其它“操作”中调用close
,调用close
之后mReferenceCount
的值可以断定是0。
到这里为止,感觉上是可以用一个boolean值来标记引用状态的。因为由于锁的存在,只要各个“操作”是序列进行的(没有一个“操作”调用了另一个“操作”的情况),mReferenceCount
只可能是0和1。推测引用计数就是为了应付“操作”之间存在调用这种情况。这就像同一个线程里的嵌套锁需要进行计数一样。
2 SQLiteDatabase的打开与关闭
2.1 关闭
上文中提到的onAllReferenceReleased
是一个抽象方法。其在SQLiteDatabase
中的实现为
@Override
protected void onAllReferencesReleased() {
dispose(false);
}
在finalize中同样调用了dispose
方法
protected void finalize() throws Throwable {
try {
dispose(true);
} finally {
super.finalize();
}
}
而dispose
的实现为
private void dispose(boolean finalized) {
final SQLiteConnectionPool pool;
synchronized (mLock) {
if (mCloseGuardLocked != null) {
if (finalized) {
//CloseGuard是一个监测是否及时调用close方法的类,一般来说除了输出日志并不会做别的什么
//这里事实上就是在finalize的时候如果没有close过,就输出一条日志
mCloseGuardLocked.warnIfOpen();
}
mCloseGuardLocked.close();
}
pool = mConnectionPoolLocked;//这个mConnectionPool是连接池。此方法里将其置空并关闭。后文详细讨论其作用。
mConnectionPoolLocked = null;
}
if (!finalized) {
//sActiveDatabases是一个静态的WeakHashMap,用key来放置所有活动数据库,而value并没有作用。dispose的时候自然要移除this。
//跟踪代码分析下来,用这个map只是为了bug report
synchronized (sActiveDatabases) {
sActiveDatabases.remove(this);
}
if (pool != null) {
pool.close();
}
}
}
2.2 打开
在本系列第一篇中我们曾看到过,最终的打开数据库的是一个静态方法,SQLiteDatabase.openDatabase(String path, CursorFactory factory, int flags, DatabaseErrorHandler errorHandler)
。
public static SQLiteDatabase openDatabase(String path, CursorFactory factory, int flags,
DatabaseErrorHandler errorHandler) {
SQLiteDatabase db = new SQLiteDatabase(path, flags, factory, errorHandler);
db.open();
return db;
}
这里很简单,就是新建一个对象,然后调用open
。构造器里只有一些初始化,略过。着重看open
方法:
private void open() {
try {
try {
openInner();//尝试一次
} catch (SQLiteDatabaseCorruptException ex) {
onCorruption();//失败了,再次尝试前调用另一个方法。
openInner();
}
} catch (SQLiteException ex) {
Log.e(TAG, "Failed to open database '" + getLabel() + "'.", ex);
close();
throw ex;
}
}
private void openInner() {
synchronized (mLock) {
assert mConnectionPoolLocked == null;
mConnectionPoolLocked = SQLiteConnectionPool.open(mConfigurationLocked);
mCloseGuardLocked.open("close");
}
synchronized (sActiveDatabases) {//这是之前那个WeakHashMap
sActiveDatabases.put(this, null);
}
}
void onCorruption() {
EventLog.writeEvent(EVENT_DB_CORRUPT, getLabel());
mErrorHandler.onCorruption(this);
}
open
中会尝试调用openInner
。如果失败一次,则调用onCorruption
,随后再尝试一次。mErrorHandler
是构造器传入的,构造器参数由静态方法openDatabase
传入,而这个参数又最终从SQLiteOpenHelper
传入。
openInner
中做的事情,从命名上看,是开启一个SQLiteConnectionPool
即数据库连接池。简单地说,数据库连接池维持了对数据库的多个连接。数据库连接的类是SQLiteConnection
。
3 线程内单例的SQLiteSession
private final ThreadLocal<SQLiteSession> mThreadSession = new ThreadLocal<SQLiteSession>() {
@Override
protected SQLiteSession initialValue() {
return createSession();
}
};
SQLiteSession getThreadSession() {
return mThreadSession.get(); // initialValue() throws if database closed
}
SQLiteSession createSession() {
final SQLiteConnectionPool pool;
synchronized (mLock) {
throwIfNotOpenLocked();
pool = mConnectionPoolLocked;
}
return new SQLiteSession(pool);
}
ThreadLocal
会在每个线程内维护一个对象,而在线程结束时解除对对象的引用。initialValue
方法会在线程中不存在已有对象时创建一个,不Override的话会给出一个null。除此之外也可以通过ThreadLocal.set
来给本线程配置一个对象。
可以看到mThreadSession
是一个ThreadLocal
。调用getThreadSession
会获取一个线程内单例的SQLiteSession
对象。
SQLiteSession
是提供数据库操作能力(增删改查以及事务)的一个单元。它会从SQLiteConnectionPool
即连接池中获取连接,最终对数据库进行操作。
到这儿类已经有点多了。整理一下逻辑:
(1)SQLiteDatabase
持有一个ThreadLocal
,用于对每个线程生成一个SQLiteSession
;
(2)SQLiteSession
持有SQLiteConnectionPool
(虽然SQLiteDatabase
也持有连接池对象,但它只用来传递给SQLiteSession
),但是同一个SQLiteDatabase
下的SQLiteSession
是共用一个SQLiteConnectionPool
的;
(3)SQLiteConnectionPool
管理SQLiteConnection
并适时向SQLiteSession
提供之;
(4)SQLiteConnection
直接对底层数据库进行操作(这个类里面才有大量的native方法)。
接下来分析一下SQLiteSession
。
获取与释放连接,还是一个引用计数实现:
private final SQLiteConnectionPool mConnectionPool;//构造器中初始化,值从SQLiteDatabase对象中传入
private SQLiteConnection mConnection;
private int mConnectionFlags;
private int mConnectionUseCount;//无处不在的引用计数
private void acquireConnection(String sql, int connectionFlags,
CancellationSignal cancellationSignal) {
if (mConnection == null) {
assert mConnectionUseCount == 0;
mConnection = mConnectionPool.acquireConnection(sql, connectionFlags,
cancellationSignal); // might throw
mConnectionFlags = connectionFlags;
}
mConnectionUseCount += 1;
}
private void releaseConnection() {
assert mConnection != null;
assert mConnectionUseCount > 0;
if (--mConnectionUseCount == 0) {
try {
mConnectionPool.releaseConnection(mConnection); // might throw
} finally {
mConnection = null;
}
}
}
具体的数据库操作有很多executeXXX形式的方法,逻辑大同小异。挑一个看看:
public int executeForChangedRowCount(String sql, Object[] bindArgs, int connectionFlags,
CancellationSignal cancellationSignal) {
if (sql == null) {
throw new IllegalArgumentException("sql must not be null.");
}
if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) {//排除特殊操作
return 0;
}
//获取连接
acquireConnection(sql, connectionFlags, cancellationSignal); // might throw
try {
//底层数据库操作。本文不关心。
return mConnection.executeForChangedRowCount(sql, bindArgs,
cancellationSignal); // might throw
} finally {
//释放连接
releaseConnection(); // might throw
}
}
//用来支持'BEGIN','COMMIT','ROLLBACK'的操作。就是与Transaction相关的操作。
private boolean executeSpecial(String sql, Object[] bindArgs, int connectionFlags,
CancellationSignal cancellationSignal) {
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}
final int type = DatabaseUtils.getSqlStatementType(sql);
switch (type) {
case DatabaseUtils.STATEMENT_BEGIN:
beginTransaction(TRANSACTION_MODE_EXCLUSIVE, null, connectionFlags,
cancellationSignal);
return true;
case DatabaseUtils.STATEMENT_COMMIT:
setTransactionSuccessful();
endTransaction(cancellationSignal);
return true;
case DatabaseUtils.STATEMENT_ABORT:
endTransaction(cancellationSignal);
return true;
}
return false;
}
4 单次完整的SQLite操作
4.1 SQLiteStatement
以最简单的delete
方法为例。其它方法的流程均大同小异。
public int delete(String table, String whereClause, String[] whereArgs) {
acquireReference();
try {
SQLiteStatement statement = new SQLiteStatement(this, "DELETE FROM " + table +
(!TextUtils.isEmpty(whereClause) ? " WHERE " + whereClause : ""), whereArgs);
try {
return statement.executeUpdateDelete();
} finally {
statement.close();
}
} finally {
releaseReference();
}
}
先用SQLiteStatement
做一些sql转义和拼接,然后调用statement.executeUpdateDelete()
。
具体看一下executeUpdateDelete
:
//以下来自SQLiteStatement
public int executeUpdateDelete() {
acquireReference();//注意这里是SQLiteStatement内的引用计数,不是SQLiteDatabase了。
try {
return getSession().executeForChangedRowCount(
getSql(), getBindArgs(), getConnectionFlags(), null);//上一节分析过了,执行SQL。
} catch (SQLiteDatabaseCorruptException ex) {
onCorruption();
throw ex;
} finally {
releaseReference();
}
}
//这个方法在父类SQLiteProgram中。又回到了上一小节的getThreadSession。获取线程内的单例。
protected final SQLiteSession getSession() {
return mDatabase.getThreadSession();
}
4.2 SQLiteDirectCursorDriver与SQLiteQuery
与4.1不同的是,在进行query操作时,最终没有使用SQLiteStatement
类,而是通过SQLiteDirectCursorDriver
间接使用了SQLiteQuery
。而SQLiteQuery
与SQLiteStatement
同为SQLiteProgram
的子类,完成类似的功能。
所有的query操作最终均调用这样一个方法:
public Cursor rawQueryWithFactory(
CursorFactory cursorFactory, String sql, String[] selectionArgs,
String editTable, CancellationSignal cancellationSignal) {
acquireReference();
try {
SQLiteCursorDriver driver = new SQLiteDirectCursorDriver(this, sql, editTable,
cancellationSignal);
return driver.query(cursorFactory != null ? cursorFactory : mCursorFactory,
selectionArgs);
} finally {
releaseReference();
}
}
而SQLiteDirectCursorDriver
的query方法如下:
public Cursor query(CursorFactory factory, String[] selectionArgs) {
final SQLiteQuery query = new SQLiteQuery(mDatabase, mSql, mCancellationSignal);
final Cursor cursor;
try {
query.bindAllArgsAsStrings(selectionArgs);
if (factory == null) {
cursor = new SQLiteCursor(this, mEditTable, query);
} else {
cursor = factory.newCursor(mDatabase, this, mEditTable, query);
}
} catch (RuntimeException ex) {
query.close();
throw ex;
}
mQuery = query;
return cursor;
}
其中新建了一个SQLiteQuery
,并绑定参数。随后新建一个Cursor
,这就是最终返回的Cursor对象。接下来考察无CursorFactory
情况下默认返回的SQLiteCursor
。
AbstractCursor
中各种move方法均会调用moveToPosition
,而moveToPosition
会调用onMove
,SQliteCursor
中onMove
的实现为:
@Override
public boolean onMove(int oldPosition, int newPosition) {
// Make sure the row at newPosition is present in the window
if (mWindow == null || newPosition < mWindow.getStartPosition() ||
newPosition >= (mWindow.getStartPosition() + mWindow.getNumRows())) {
fillWindow(newPosition);
}
return true;
}
private void fillWindow(int requiredPos) {
clearOrCreateWindow(getDatabase().getPath());
try {
if (mCount == NO_COUNT) {
int startPos = DatabaseUtils.cursorPickFillWindowStartPosition(requiredPos, 0);
mCount = mQuery.fillWindow(mWindow, startPos, requiredPos, true);
mCursorWindowCapacity = mWindow.getNumRows();
if (Log.isLoggable(TAG, Log.DEBUG)) {
Log.d(TAG, "received count(*) from native_fill_window: " + mCount);
}
} else {
int startPos = DatabaseUtils.cursorPickFillWindowStartPosition(requiredPos,
mCursorWindowCapacity);
mQuery.fillWindow(mWindow, startPos, requiredPos, false);
}
} catch (RuntimeException ex) {
// Close the cursor window if the query failed and therefore will
// not produce any results. This helps to avoid accidentally leaking
// the cursor window if the client does not correctly handle exceptions
// and fails to close the cursor.
closeWindow();
throw ex;
}
}
核心逻辑在mQuery.fillWindow(mWindow, startPos, requiredPos, false);
这里。mQuery
就是之前传入的SQLiteQuery
对象。查看其fillWindow
方法:
int fillWindow(CursorWindow window, int startPos, int requiredPos, boolean countAllRows) {
acquireReference();
try {
window.acquireReference();
try {
int numRows = getSession().executeForCursorWindow(getSql(), getBindArgs(),
window, startPos, requiredPos, countAllRows, getConnectionFlags(),
mCancellationSignal);
return numRows;
} catch (SQLiteDatabaseCorruptException ex) {
onCorruption();
throw ex;
} catch (SQLiteException ex) {
Log.e(TAG, "exception: " + ex.getMessage() + "; query: " + getSql());
throw ex;
} finally {
window.releaseReference();
}
} finally {
releaseReference();
}
}
可以看到,最终回到了SQLiteSession.executeXXX
方法逻辑之下。其余即与上一节类似。
而从Cursor
中取出数据的过程,则最终是由CursorWindow
下的一系列native方法来完成,我认为属于Cursor的代码体系了,这里不重点展开。
5 Transaction
5.1 beginTransaction
//一群差不多的beginTransaction方法最终调用到了这里
private void beginTransaction(SQLiteTransactionListener transactionListener,
boolean exclusive) {
acquireReference();//怎么老是你
try {
getThreadSession().beginTransaction(
exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
transactionListener,
getThreadDefaultConnectionFlags(false /*readOnly*/), null);
} finally {
releaseReference();
}
}
//上面的方法调用了这个方法。这套flags做了两件小事:1.确定只读还是可写 2.如果是主线程,就要提高连接的优先级
int getThreadDefaultConnectionFlags(boolean readOnly) {
int flags = readOnly ? SQLiteConnectionPool.CONNECTION_FLAG_READ_ONLY :
SQLiteConnectionPool.CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY;
if (isMainThread()) {
flags |= SQLiteConnectionPool.CONNECTION_FLAG_INTERACTIVE;
}
return flags;
}
还是要看SQLiteSession
内部:
public void beginTransaction(int transactionMode,
SQLiteTransactionListener transactionListener, int connectionFlags,
CancellationSignal cancellationSignal) {
throwIfTransactionMarkedSuccessful();//一点合法性检查,不贴了
beginTransactionUnchecked(transactionMode, transactionListener, connectionFlags,
cancellationSignal);
}
private void beginTransactionUnchecked(int transactionMode,
SQLiteTransactionListener transactionListener, int connectionFlags,
CancellationSignal cancellationSignal) {
if (cancellationSignal != null) {
//cancellationSignal从beginTransaction以及SQLiteStatement诸方法传入的均为null,调查发现仅query时可以传入此参数。
cancellationSignal.throwIfCanceled();
}
if (mTransactionStack == null) {//Transaction栈为空时才获取连接。
acquireConnection(null, connectionFlags, cancellationSignal); // might throw
}
try {
// Set up the transaction such that we can back out safely
// in case we fail part way.
if (mTransactionStack == null) {//如果没有进行中的Transaction,创建一个并BEGIN
// Execute SQL might throw a runtime exception.
switch (transactionMode) {
case TRANSACTION_MODE_IMMEDIATE:
mConnection.execute("BEGIN IMMEDIATE;", null,
cancellationSignal); // might throw
break;
case TRANSACTION_MODE_EXCLUSIVE:
mConnection.execute("BEGIN EXCLUSIVE;", null,
cancellationSignal); // might throw
break;
default:
mConnection.execute("BEGIN;", null, cancellationSignal); // might throw
break;
}
}
// Listener might throw a runtime exception.
if (transactionListener != null) {
try {
transactionListener.onBegin(); // might throw
} catch (RuntimeException ex) {
if (mTransactionStack == null) {
mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
}
throw ex;
}
}
// Bookkeeping can't throw, except an OOM, which is just too bad...
Transaction transaction = obtainTransaction(transactionMode, transactionListener);//创建事务
transaction.mParent = mTransactionStack;
mTransactionStack = transaction;//入栈
} finally {
if (mTransactionStack == null) {//这里要栈为空时才释放连接。不为空时永远持有一个连接。
releaseConnection(); // might throw
}
}
}
private static final class Transaction {
public Transaction mParent;//这个是个链表,或者说在这里充当了一个栈
public int mMode;
public SQLiteTransactionListener mListener;
public boolean mMarkedSuccessful;
public boolean mChildFailed;
}
5.2 setTransactionSuccessful与endTransaction
直接看SQLiteSession
吧:
public void setTransactionSuccessful() {
throwIfNoTransaction();
throwIfTransactionMarkedSuccessful();
mTransactionStack.mMarkedSuccessful = true;//仅仅是个标记
}
public void endTransaction(CancellationSignal cancellationSignal) {
throwIfNoTransaction();
assert mConnection != null;
endTransactionUnchecked(cancellationSignal, false);
}
private void endTransactionUnchecked(CancellationSignal cancellationSignal, boolean yielding) {
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}
final Transaction top = mTransactionStack;
boolean successful = (top.mMarkedSuccessful || yielding) && !top.mChildFailed;//如果有子Transaction失败,也是失败的
RuntimeException listenerException = null;
final SQLiteTransactionListener listener = top.mListener;
if (listener != null) {
try {
if (successful) {
listener.onCommit(); // might throw
} else {
listener.onRollback(); // might throw
}
} catch (RuntimeException ex) {
listenerException = ex;
successful = false;
}
}
mTransactionStack = top.mParent;//退栈
recycleTransaction(top);//回收
if (mTransactionStack != null) {//还没到最外层事务,只做个标记
if (!successful) {
mTransactionStack.mChildFailed = true;
}
} else {//到了最外层事务了,提交或回滚
try {
if (successful) {
mConnection.execute("COMMIT;", null, cancellationSignal); // might throw
} else {
mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
}
} finally {
releaseConnection(); // might throw
}
}
if (listenerException != null) {
throw listenerException;
}
}
6 总结
(1)总的来说,SQLiteDatabase
是线程安全且高效的。它并没有简单地对每次操作加锁,而是使用引用计数和ThreadLocal来保证连接复用的线程安全性,数据一致性则交由SQLite自身去保证,以达到最优性能。
而很多时候我们在业务层封装时反而处处加锁,其实是没有必要的。
(2)SQLiteDatabase
的内部实现会让每个线程单独持有一个数据库连接(不一定是创建,因为有连接池优化),而不是每个SQLiteDatabase
对象对应一个连接。
(3)数据库会给主线程持有的连接提高优先级。如果执行的是读操作或者小量数据的写入操作的话,可能可以满足主线程低延迟的需要。但是还没有具体的数据来支撑这一结论,希望有大牛补充。
(4)多线程下的事务行为本文中未作分析,下一篇会就此问题单独进行讨论。
网友评论