import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* @Description
* @Date 2022-07-06 9:23 AM
*/
public class FeedProcessor extends AbstractFeedProcessor {
private ExecutorService threadPool = Executors.newFixedThreadPool(6);
private Map<Integer, List<Integer>> resultList = new ConcurrentHashMap<>();
public FeedProcessor() {
}
public void add(DataItem item) {
threadPool.submit(new ProcessTask(item));
}
class ProcessTask implements Runnable {
private DataItem item;
ProcessTask(DataItem item) {
this.item = item;
}
@Override
public void run() {
for(;;){
int id = item.getId();
List<Integer> list = resultList.get(id);
// 如果版本号为 1 则可以继续执行
if(list == null && item.getVersion() == 1){
saveItem(item);
list = new ArrayList<>();
list.add(item.getVersion());
resultList.put(id, list);
break;
} else {
// 最后一个版本号 + 1 等于自己的版本号,则轮到该任务执行
if(list != null && list.get(list.size() -1) + 1 == item.getVersion() ){
saveItem(item);
list.add(item.getVersion());
resultList.put(id, list);
break;
} else {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
public void shutdown() {
threadPool.shutdown();
try {
threadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
网友评论