import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import hooyee.IDataElement;
/**
* 一个缓存队列,用于管理
* [a[], b[], c[]]形式的数据,从其中取出指定长度的数据,若指定的长度小于队首的可用长度,则返回该队首元素(该元素不出队),并修改该元素的region.
* 若大于队首,则直接返回队首,并将队首元素出队,可用范围是 T 的[startPosition, endPosition)
* @param <T>
*/
public class DataCache<T extends IDataElement> {
private long MAX_SIZE;
private final AtomicLong totalSize = new AtomicLong();
/**
* dataQueue的poll和offer操作也需要保证线程安全,因此变更数据结构
*/
Queue<T> dataQueue = new ConcurrentLinkedQueue<>();
/**
* offer和poll需要通过overflow来确认,而overflow又依赖于updateSize,因此需要保证poll和offer的整个流程的原子性(其实只要保证offer,因为poll是会减小他的值,这样只会更加满足条件)
**/
private final ReentrantLock pollLock = new ReentrantLock();
private final ReentrantLock offerLock = new ReentrantLock();
public DataCache() {
MAX_SIZE = 1024 * 1024 * 4;
}
public DataCache(long maxSize) {
MAX_SIZE = maxSize;
}
/**
* 线程安全的, 只用于T为 IElement的时候,若不是与不带参的效果一样
* @param length 从队列的第一个元素中获取指定长度的数据,如果第一个元素小于改长度,调用poll,若大于则调用peek,并移动offset等信息
* @return
*/
public @Nullable T poll(int length) {
pollLock.lock();
try {
T data = dataQueue.peek();
if (data != null) {
removeIfOver(data, length);
}
return data;
} finally {
pollLock.unlock();
}
}
/**
* 若是queue中的元素可用长度[endPosition, length)超过入参length,则不出队列,以备下次使用
* @param data
* @param length
*/
private boolean removeIfOver(@NonNull T data, int length) {
// 未被读取的数据
if (data.getLength() - data.getEndPosition() > length) {
// 大于length只截取[start, end)部分
// 移动被读取的region[start, end)
data.setStartPosition(data.getEndPosition());
data.setEndPosition(data.getEndPosition() + length);
return false;
} else {
data.setStartPosition(data.getEndPosition());
data.setEndPosition(data.getLength());
dataQueue.poll();
updateSize(-data.getLength());
return true;
}
}
/**
* 线程安全的
*
* @param data
*/
public void offer(@NonNull T data) {
if (overflow()) {
return;
}
offerLock.lock();
try {
if (!overflow()) {
dataQueue.offer(data);
updateSize(data.getLength());
} else {
}
} finally {
offerLock.unlock();
}
}
/**
* 清空数据
*/
public void clear() {
dataQueue.clear();
totalSize.set(0);
}
private void updateSize(long size) {
totalSize.addAndGet(size);
}
/**
* true : 满了
*
* @return
*/
private boolean overflow() {
return totalSize.get() >= MAX_SIZE;
}
}
网友评论