Future
Future是多线程开发中非常常见的一种设计模式,核心思想是异步调用。当调用要给方法的时候,不会阻塞当前的线程,让他可以去做其他事情,等数据Ok了,再通知它,或者等调用线程必须要了,再尝试获取它。
Scala的Future
在用scala
编程的时候,感叹它的Future
是真的厉害,纯异步,flatMap
的回调写起来是真的舒服,虽然debug查问题也是真的痛苦。
异步编程的模型就是任务的执行是独立于主程序流之外的,可以在资源暂时不可用的时候帮助消除阻塞,而不是挂起线程。
两个重要的概念体:
Promise
Future
Promise
promises a Future
.
Future
A future is a placeholder, that is, a memory location for the value. This placeholder does not need to contain a value when the future is created; the value can be placed into the future eventually.
那calling thread什么时候知道要将值从Future中提取出来呢?
- polling approach:只是将阻塞从invoked method中转移到caller thread. (就像java那样)
- call-back and functional compostion: scala中的foreach(等价于onSuccess,还有
onComplete
,onSuccess
,onFailure
,)是callback, functional composition是map(不过scala的future也提供了polling的方法,isCompleted
), 通过install callback回调(不过回调并不是立刻在Future的值得到后立刻执行的,所以我们在future完成后再注册回调也是不会报错的)
A callback is a function that is called once its arguments become available. When a Scala
future takes a callback, it eventually calls that callback. However, the future does not call
the callback before this future is completed with some value. 回调之间也不保证执行顺序
Notice: completion of the future is a happen-before relationship with the polling calls. If the future completes before the invocation of the polling method, then its effect are visible to the thread after polling completes.
Notice: After the future is completed, the callback is called eventually and independently from other callbacks on the same future. The specified execution context decides when and on which thread the callback gets executed. There is a happen-before relationship between completing the future and starting the callback.
另外关于引用透明,如果Future只是用Future.apply 和 foreach 来注册引用透明的回调,那么future就一定是引用透明的。
Programs using only the Future.apply and foreach calls with referentially transparent callbacks are
deterministic. For the same inputs, such programs will always compute the same results .Programs composed from referentially transparent future computations and callbacks
are deterministic.
用foreach注册成功的回调,用failed来注册失败的回调
someFuture.failed foreach {failedOne => doSomething}
什么时候用foreach,什么时候用map等?
use callbacks for side-effecting actions(like print) that depend on a single future. In all other situations, we can use functional composition.
When an action in the program depends on the value of a single future, use callbacks on
futures. When subsequent actions in the program depend on values of multiple futures or produce new futures, use functional composition on futures.
for表达式里的不是并发的Future?
不完全对,Future创建出来后就进入了执行池,for里如果是串行建立Future的话,Future的执行也是串行的,所以如果Future之间没有依赖的话,建议先建立好,然后放入for表达式里。
Future的异常和错误
使用recover吸收错误,使用Try来既匹配成功又匹配失败,fallbackTo
, recoverWith
或者其他的pattern match
f.failed foreach { case NonFatal(t) => log("")}
Promise
Promises are objects that can be assigned a value or an exception only once. This is why promises are sometimes also called single-assignment variables.
To assign a value or an exception to a promise, we use the success or failure method, respectively.
A promise and a future represent two aspects of a single--assignment variable--the
promise allows you to assign a value to the future object, whereas the future allows you
to read that value
Promise和Future代表了只能赋值一次的变量的两面,前者是写后者是读。可以用success
方法写入值,或者failure
写入失败的原因,再或者可以用complete
把Try[T]
写进promise。写完之后,再写就会触发异常,所以常用的方法是:
trySuccess, tryFailure, tryComplete
promise另外可以解决的是,其他的三方库并不都是异步计算,而是通过回调来写入结果,promise可以转化他们'callback-based apis' 到异步的Future来和scala所用的框架适配, 通过这个桥梁,来避免'inversion of control'(the control flow is not apparent from the code)。
一般都是固定的模式:先创建一个promise, 然后将阻塞的运算defer到另外的计算里(比如其他的线程,或者可以回调的方法,一定要在那个计算里将值写回promise),最后返回promise.future.
Notice :切记,使用promise future-callback bridge,一定要记得timeout,超时就失败Future,否则可能计算永远不完成,Future永远不完成。
拓展future的API
implicit class FutureOps[T](val self: Future[T]) {
def or(that: Future[T]): Future[T] = {
val p = Promise[T]
self onComplete { case x => p tryComplete x }
that onComplete { case y => p tryComplete y }
p.future
}
}
Future的取消
在java里面多线程的执行取消就很麻烦,因为java并没有提供机制来安全地终止线程,但是它提供了中断,一种协作机制,能够让一个线程给另一个线程发送中断,终止的消息。然后发送终止消息的线程并不是很清楚需要终止线程的终止机制,如果立刻停止,很可能使得数据不一致,所以,得让线程自己终止自己,只有它自己最清楚,这也体现了协作机制。
线程在非阻塞状态下(这里的阻塞是指java的wait和timed_wait状态,不是blocked状态),中断状态被设置(但是并不会抛异常),然后线程自己需要主动检测中断状态(这样的检测点,时刻,成为取消点)。而另外的一些方法等(wait,join,sleep, tryLockInterrupted),他们会进入wait和timed_wait状态,他们在执行或者进入执行前发现中断信号,会抛出InterruptedException
,并且清除中断位。
哪些情况的阻塞方法或者阻塞机制是不能响应中断的呢? --《java并发编程实践》中详细讲述了任务,线程等怎么取消,线程池如何关闭等。
- Java.io包中的同步io
- Java.io包中的同步Socket IO:
InputStream
等的read和write, 但是可以通过关闭它的套接字来触发SocketException
来停止线程 - Selector的异步I/O: select方法的时候阻塞,但是可以通过close或者wakeup方法提前返回
- 获取内置的锁,monitor
scala中也没有一个机制取消,可以利用Future来变相达到目的
val f = timeout(1000).map(_ => "timeout!") or Future {
Thread.sleep(999)
"work completed!"
}
第二个Future在tryComplete的时候会失败。但是缺点也很明显,就是这操作任然是在进行着,没有实际终止,而唯一的办法就是深入到Future的执行计算内部,告诉它,需要停止(不过这又回到java如何停止线程上来了),我们使用Future和promise的结合来完成双向的交流来做这件事,主要思路还是:一个Future保存计算结果,这个Future的计算逻辑在执行的时候,需要判断是否收到来自客户端的取消消息,如果收到,抛异常,如果没有,继续计算并且给出结果,使用Promise来接受取消消息,Future的计算逻辑内保存这个一个Promise的引用,客户端向Promise发送消息,Future的计算逻辑就能收到这个消息。
type Cancellable[T] = (Promise[Unit], Future[T])
// Future[Unit] => T, 我们并不是从Future[Unit]中计算的T,而只是从Future[T]中读取客户端是否取消这个消息
def cancellable[T](calculate: Future[Unit] => T): Cancellable[T] = {
val cancel = Promise[Unit]()
val f = Future {
val r = calculate(cancel.future)
// 为什么要在计算完之后,给出结果前判断,是避免结果已经完成,但是客户端才调了取消,这时候
// 我们也希望客户端得知的是取消了的,这和java里的FutureTask的选择不同,在java里的FutureTask如果已经 // 完成,取消没有意义。
// 或者在 具体的计算逻辑里 见notice:
if(!cancel.tryFailure(new Exception())){ // tryComplete(Failure(new Exception()))
throw new CancellationException()
}
r
}
(cancel, f)
}
val (cancel, futureValue) = cancellable { cancel =>
var stop = false //or 其他判断
while(!stop) {
if(cancel.isCompleted) throw CancellationException // 当completed就是设置了值,就是取消
....doing job....
}
// notice: 可以在这里, 把promise也传递进来,计算完设置tryComplete,阻止客户端取消
res
}
像这样Future-Promise
的two-way communication可以做到许多事情。
等待Future完成
主动
- ready: 只等待完成,不管是否异常
- result:成功返回结果,失败抛异常
调用阻塞式API
要把这种Future,或者Promise包住的阻塞式API,放入专门的Blocking thread pool
,避免它阻塞nonblocking thread pool
里的所有线程,使得非阻塞的Future没办法执行了,因为阻塞式的API用Future或者Promise包住,并不等于就是成为了异步,比如有个接口是调用的BIO来读取文件,用Future包起来,或者Promise包起来(但是不放到其他阻塞线程池里,而是当前的非阻塞线程池里),它仍然会阻塞,而像Play
框架里用到的WSClient
底层都是AIO
,不会阻塞的。
如果真的没有其他阻塞线程池的话,只好用
Future {
blocking { // 使用blocking 告诉执行环境,可以适当产生新的线程来执行其他任务
BIO
Thread.sleep()
}
}
The Await.ready and Await.result statements block the caller thread until the future is
completed, and are in most cases used outside the asynchronous computations. They
are blocking operations. The blocking statement is used inside asynchronous code to
designate that the enclosed block of code contains a blocking call. It is not a blocking
operation by itself.
Java的Future
java的Future相较于scala的Future比较鸡肋,它没办法把回调操作串联起来。在最终需要数据的地方还是阻塞调用。
模拟一下Future
四个类:
- 代表数据的接口
- FutureData
- CalData, 某个我们需要的数据以及它的运算逻辑
- Client,请求
public interface Data<T> {
public T getData() throws InterruptedException;
}
public class FutureData<T> implements Data<T>{
protected boolean isReady = false;
protected T realData = null;
public synchronized void setData(T realData) {
if(! isReady) {
this.realData = realData;
isReady = true;
notifyAll();
}
}
@Override
public synchronized T getData() throws InterruptedException {
while (!isReady) {
try{
wait();
}catch (InterruptedException ie) {}
}
return realData;
}
}
public class CalData implements Data<String> {
protected final String result;
public CalData() {
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 100; ++ i) {
sb.append(i);
try{
Thread.sleep(50);
}catch (InterruptedException interruptedException) {
}
}
result = sb.toString();
}
@Override
public String getData() throws InterruptedException {
return this.result;
}
}
public class Client {
public Data request(final String req) {
final FutureData<String> futureData = new FutureData<String>();
Thread myThread = new Thread(() -> {
Data<String> s = new CalData();
try {
futureData.setData(s.getData());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
myThread.start();
return futureData;
}
public static void main(String[] args) throws InterruptedException {
Client c = new Client();
Data<String> ss = c.request("dfd");
System.out.println("请求完毕");
System.out.println(ss.getData());
}
}
最后会在请求完毕后阻塞,然后等待计算完成。
- [1] java并发编程实战
- [2] learning concurrent-programming in scala 2nd
网友评论