一个个的job被封装为一个命令
private static AtomicInteger sequencing = new AtomicInteger(0);
private static class Command implements Comparable<Command> {
public Runnable runnable;
public MessagingListener listener;
public String description;
boolean isForegroundPriority;
int sequence = sequencing.getAndIncrement();
@Override
public int compareTo(@NonNull Command other) {
if (other.isForegroundPriority && !isForegroundPriority) {
return 1;
} else if (!other.isForegroundPriority && isForegroundPriority) {
return -1;
} else {
return (sequence - other.sequence);
}
}
}
这些个命令被放到blockquene中
private final BlockingQueue<Command> queuedCommands = new PriorityBlockingQueue<>();
private void put(String description, MessagingListener listener, Runnable runnable) {
putCommand(queuedCommands, description, listener, runnable, true);
}
private void putBackground(String description, MessagingListener listener, Runnable runnable) {
putCommand(queuedCommands, description, listener, runnable, false);
}
private static void putCommand(BlockingQueue<Command> queue, String description, MessagingListener listener,
Runnable runnable, boolean isForeground) {
int retries = 10;
Exception e = null;
while (retries-- > 0) {
try {
Command command = new Command();
command.listener = listener;
command.runnable = runnable;
command.description = description;
command.isForegroundPriority = isForeground;
queue.put(command);
return;
} catch (InterruptedException ie) {
SystemClock.sleep(200);
e = ie;
}
}
throw new Error(e);
}
一个while(true)去不停的取任务执行,执行失败继续放到blockquene中等待重试。
private final BlockingQueue<Command> queuedCommands = new PriorityBlockingQueue<>();
private void runInBackground() {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
while (!stopped) {
String commandDescription = null;
try {
final Command command = queuedCommands.take();
if (command != null) {
commandDescription = command.description;
Timber.i("Running command '%s', seq = %s (%s priority)",
command.description,
command.sequence,
command.isForegroundPriority ? "foreground" : "background");
try {
command.runnable.run();
} catch (UnavailableAccountException e) {
// retry later
new Thread() {
@Override
public void run() {
try {
sleep(30 * 1000);
queuedCommands.put(command);
} catch (InterruptedException e) {
Timber.e("Interrupted while putting a pending command for an unavailable account " +
"back into the queue. THIS SHOULD NEVER HAPPEN.");
}
}
}.start();
}
Timber.i(" Command '%s' completed", command.description);
}
} catch (Exception e) {
Timber.e(e, "Error running command '%s'", commandDescription);
}
}
}
网友评论