QCon London 2019- Why Continuations are Coming to Java
-
talks说明
- 讨论并比较了处理和并发的各种技术如纯函数式(monads, affine types)和命令式编程(threads, continuations, monads, async/await)
- 说明了为什么continuations是非常适合命令式风格
-
talks笔记
-
Why Java is Getting Continuations
-
关于计算的观点
- 确定性的、顺序的
- 不确定性的、交互的/并发的
-
纯函数式编程
-
确定性、顺序的
-
Linear Types: nondeterminism as a function of unknown values
- 线性类型-未知函数的不确定性
-
IO Type: Move nondeterminism outside the program
- io类型-将不确定性转移到程序之外
-
landon: 这里主要是函数式编程的一些知识,如haskell、monad等
-
-
经典的命令式编程
- 不确定性的、并发的/交互的
-
反应式编程的问题
// 一个阻塞方法 double calcImportantFinance(double x) { try { double result; result = compute("USD->euro", x); result = compute("subtractTax", result * 1.3); result = compute("addInterest", result); return result; } catch(Exception ex) { log(ex); throw ex; } } double compute(String op, double x); // 用CompletableFuture异步实现 CompletableFuture<Double> calcImportantFinance (double x) { return compute("USD->euro", 100.0) .thenCompose(result -> compute("subtractTax", result * 1.3)) .thenCompose(result -> compute("addInterest", result)) .handle((result,ex) -> { if(ex != null) { log(ex); throw new RuntimeException(ex); }else{ return result } }); }
- Lost Control Flow
double result; result = compute("USD->euro", x); if(result > 1000) { result = compute("subtractTax", result * 1.3); } while(!sufficient(result)) { result = compute("addInterest", result); } return result;
-
CompletableFuture和控制流在某些方面不匹配
-
Lost Context
- 上面每一步都是在不同的线程执行
- 当出现异常时,上下文丢失,则后面调试代码非常困难。另外性能分析也会变的困难,因为性能分析是通过对堆栈跟踪采样来进行的。
-
Viral 病毒式的
- 最大的问题是返回类型,返回值是CompletableFuture<Double>
- 除非调用阻塞的get方法,否则无法获取值
- 另外调用链上也必须异步方式使用这些CompletableFuture,这是病毒式的
- 这意味着我们的这个调用堆栈要么是阻塞同步,要么是异步,两种几乎不能互操作
-
Async/Await
async Task<double> CalImportantFinace() { try { double result; result = await compute("USD->euro", x); result = await compute("subtractTax", result * 1.3); result = await compute("addInterest", result); return result; } catch(Exception ex) { log(ex); throw ex; } }
- C#引入一个叫async and await的东西,功能类似于Java的CompletableFuture,但是可以用await关键字作为异步方法的前缀,看着是比较漂亮的命令式代码
- 解决了提到的第一个lost control flow的问题。但是对于第二个问题,因为上面的每一行仍然在不同的线程上执行,如果出现异常的话会是一个比较奇怪的异常堆栈
- .Net Core 2.1尝试修复此问题,它们人为地生成了一个可以捕获实际上下文的堆栈跟踪,但是与您编写普通的阻塞代码时所得到的堆栈跟踪相比,它仍然是非常不同的堆栈跟踪
- 所以最大的问题还是隔离了同步阻塞世界和异步世界
-
Why give up a good (core!) abstraction just because of an inadequate implementation?
- 命令式语言不仅处理输入和非确定性语言,还具有线程,进程和阻塞的内置概念,从编程角度来看,它是一个很好的抽象。我们有时想要避免的唯一原因是内核线程的实现负担太重
- 显而易见的解决方案是更改实施,而不是更改我们的编程方式。这就是我们作为Project Loom的一部分正在尝试做的事情
- First, we need the ability to stop the code that's running on the CPU and say, "I'm not using the CPU anymore," and later on have the ability to resume it.
- Continuation,yield and resume
- we need some mechanism to schedule those pieces that want to run the CPU
- Scheduler
- thread/process = yield control and resume + execution scheduling
- thread/process = continuation + scheduler
-
Continuations
- 当您调用run时,continuation的主体将运行到完成或直到下一次调用yield为止
- 作用域使我们可以在另一个内部嵌套不同的continuation,并能够挂起多个continuation并跳回多个调用方,这与我们在堆栈中抛出异常的方式类似
- 限定的意思是我们要暂停和恢复的代码部分不是整个程序,而只是body内的代码
- 每次运行它都会一直运行到下一个挂起点,然后状态改变,然后当您再次运行它时,它将从第一个挂起点运行到第二个挂起点,我们永远都无法回到过去
- 克隆和序列化
-
Fiber
-
今天,编写应用程序的人可以选择编写简单的同步阻塞代码,但是他们依靠内核为他们提供线程,并且内核只能处理这么多线程。该应用程序将非常清晰,易于维护和调试,但将不可扩展。另一种选择是编写异步代码,正如我向您展示的那样,这要复杂得多,并且很难将现有代码放入其中,但至少它是可伸缩的
-
使用fiber,如果我们确实可以使它们比内核提供的线程轻得多,则可以解决此问题。您可以根据需要拥有任意数量的这些用户模式轻量级线程,并且可以进行阻塞,阻塞实际上是免费的
-
fiber = continuation + scheduler
-
codes like sync,works like async
- concurrency made simple
-
这使编写并发应用程序变得更加容易,因为它可以帮助您匹配业务的并发单元。这是重量级内核线程无法实现的,因为您可能有100,000个并发用户,但是当前线程却无法达到100,000
-
io,java.util.concurrent
- 目前已经是fiber-blocking了
-
如果你喜欢Async/Await,那么下面这个就是整个实现
class Async<T> extends CompletableFuture<T> { public static <T> T await(Async<T> async) throwsInterruptedException, ExecutionException { return async.get(); } }
- 即可以直接在fiber中调用这个await
-
JDK中旧的重量级线程的数据。 它们每个都有大约2 KB的元数据,默认情况下还有1 MB的堆栈。 当涉及到fiber时,它们目前在原型中,它们目前只有2到300字节的元数据,而且可增长和收缩
-
对于内核管理的重量级线程的任务切换成本,切换任务的时间在1到10微秒之间。 对于fiber,我们不知道它到底有多少,而性能仍然是我们仍在努力的方面,但我们希望它会比现在低得多
-
-
Rethink threads
-
结构化并发
- 线程被限制在一个已知的生命周期内,该生命周期可以扩展到给定的代码块
- 我们有一个定义fiber作用域的块,保证在退出作用域时终止在作用域内创建的所有fiber
- fiber作用域可取消,也可以给它一个截止日期
-
结构化并发
try(var scope = FiberScope.cancellable()) { Fiber<?> fiber = scope.schedule(task) }
- 作用域内调度所有fiber都终止之前,无法退出作用域
- fiber作用域可以嵌套
- FiberScope具有一个终止队列,用于收集作用域内fiber的结果
- 取消一个在可取消作用域的fiber会取消所有已经调度的fiber。在这个作用域内,通过嵌套,可以取消一颗fiber树
- 取消一个park在阻塞io操作的fiber会引起unpark和检查是否取消
-
结构化并发
-
返回第一个完整任务的结果,取消并等待所有未完成的fiber终止后再返回
-
fiber作用域为我们提供了终止队列
<V> V anyOf(Callable<? extends V>[] tasks) throws Throwable { try (var scope = FiberScope.cancellable()) { var queue = new FiberScope.TerminationQueue<V>(); Arrays.stream(tasks).forEach(task -> scope.schedule(task, queue)); try { return queue.take().join(); } catch (CompletionException e) { throw e.getCause(); } finally { scope.fibers().forEach(Fiber::cancel); // cancel remaining fibers } } }
-
结构化并发
-
同样的有截止日期,如果过期则已调度的在作用域的fiber都将被取消
<V> V anyOf(Callable<? extends V>[] tasks, Instant deadline) throws Throwable { try (var scope = FiberScope.withDeadline(deadline)) { var queue = new FiberScope.TerminationQueue<V>(); Arrays.stream(tasks).forEach(task -> scope.schedule(task, queue)); try { return queue.take().join(); } catch (CompletionException e) { throw e.getCause(); } finally { scope.fibers().forEach(Fiber::cancel); // cancel remaining fibers } } }
-
-
-
-
Generators
- 我们希望人们使用fiber,而不是直接使用continuation
- 但有趣的是,continuation的一个用途是类似python的生成器(run,yield,run...)
class Generator<T> implements Iterable<T> { private static final ContinuationScope GENERATOR = new ContinuationScope(); private static class GenContinuation<T> extends Continuation { public GenContinuation() { super(GENERATOR); } private T value; } public static void yield(T value) { ((GenContinuation)currentContinuation(GENERATOR)).val = value; Continuation.yield(GENERATOR); } private final GenContinuation<T> cont; public Generator(Runnable body) { cont = new GenContinuation<T>(body); } public Iterator<T> iterator() { return new Iterator<T>() { public T next() { cont.run(); return cont.val; } public boolean hasNext() { return !cont.isDone(); } } } } var fibonacci = new Generator<Integer>(() -> { Generator.yield(0); int a = 0; int b = 1; while(true) { Generator.yield(b); var sum = a + b; a = b; b = sum; } }); // 调用迭代器,next和hasNext方法通过continuation的run和yield驱动 for (var num : fibonacci) { System.out.println(num); if (num > 10_000) break; } // 另外一个例子 var greetedPrimes = new Generator<String>(() -> { for (int n = 0; ; n++) if (isPrime(n)) { // 阻塞 var greeting = Console.readLine(); Generator.yield(greeting + ": " + n); } }); Fiber.schedule(()-> { for (var x : greetedPrimes) System.out.println(x); }
-
right now mostly interested input on the structure currency API
- 现在最感兴趣的是结构化并发api的输入
-
网友评论