1 isolate机制
dart是基于单线程模型的语言。但是在开发中我们经常会进行耗时操作,比如网络请求。这种耗时操作会阻塞我们的代码,所以dart也有并发机制,名叫isolate。app的启动入口main函数就是一个类似Android主线程的一个主isolate。和java的Thread不同的是,dart中的isolate无法共享内存。
isolate的英文直译就是隔离,孤立,脱离。在这里我们可以先简单的将其理解为java中的线程。后续根据代码来一步步弄清它到底是什么?
void main(){
Isolate.spawn(entryPoint, "hello");
}
void entryPoint(String msg){
print(msg);
}
Isolate.spawn()方法的第一个参数需要我们传入一个方法(不清楚的可以看另一篇文章Flutter学习之Dart方法,异常与类)。于是我们在下面定义了一个方法entryPoint()并需要传入String类型的数据(数据类型可以是任意的类型),那么我们在Isolate.spawn()的第二个参数就要传入对应类型的数据。
上面的main()函数我们就可以理解成Android中的主线程,下面的entryPoint就相当于是自线程。而 Isolate.spawn()就相当于创建了这个自线程。
void main(){
Isolate.spawn(entryPoint, "hello");
print("你好");
}
void entryPoint(String msg){
sleep(Duration(seconds:10));
print(msg);
}
可以看到我们在entryPoint()中睡眠了10秒,但是主线程依然会继续执行且没有阻塞。
前面我们说了Isolate是线程隔离的,那么怎么证明呢?
int i;
void main(){
i =10;
Isolate.spawn(entryPoint, "hello");
print(i);
}
void entryPoint(String msg){
sleep(Duration(seconds:10));
print(i);
}
我们声明了一个全局的 i 且没有赋值。在主线程中我们赋于其10。但是根据最后打印的结果可以看到在entryPoint()中打印的却是null。这就证明了dart中的Isolate确实是线程隔离的。但是我们如果需要两个“线程”之间要相互通信该怎么办呢?我们知道在Android中我们可以通过Handler来进行通信,可惜的是dart没有提供Handler,但是提供了ReceivePort和SendPort。
void main(){
var rp = ReceivePort();
rp.listen((t) {
print(t);
});
rp.sendPort.send("hello");
rp.sendPort.send("dart");
rp.sendPort.send(66);
}
根据上面的代码我们可以得知SendPort是在ReceivePort里面的,SendPort一发送数据,ReceivePort就可以通过listen()函数监听到数据。listen函数中的 t 是没有类型的,那么就表示它是dart中的 dynamic 类型的,可以接收任何类型。但是上图中我们看到被黄色框框住的地方。这个地方是红色的表示我们的程序正在运行,如果运行完了的话就会自动变成灰色。可我们明明已经发送完数据了,程序怎么还没有结束呢?这是因为我们忘了添加一句代码:
rp.close();
如果你现在在最后添加上这句代码,反而会看不到任何的输出。这与后面的消息驱动机制有关,暂且先不说,稍后会介绍。只要大家记得发送完了要记得close()就可以了,就像java中使用IO流一样。
上面是我们在一个线程中使用了ReceivePort和SendPort。下面就在main()和entryPoint()之间来使用:
void main(){
var rp = ReceivePort();
rp.listen((t) {
print(t);
});
Isolate.spawn(entryPoint, rp.sendPort);
}
void entryPoint(SendPort sendPort){
sendPort.send("echo");
}
可以看到这样就完成了从“子线程” entryPoint到“主线程”main之间的通信了。 现在只是单向通信,如果要完成双向通信我们就还需要从main向entryPoint发送。和之前的子到主是一样的,我们只需要在entryPoint中把“子线程”的SendPort发给“主线程”,那么就可以在“主线程”给“子线程”发消息了。
void main(){
// 主线程的ReceivePort
var rp = ReceivePort();
rp.listen((t) {
if( t is SendPort){
// 可以在这里发送消息给子线程
}
});
Isolate.spawn(entryPoint, rp.sendPort);
}
void entryPoint(SendPort sendPort){
// 子线程的ReceivePort和SendPort
var rp = ReceivePort();
var sp = rp.sendPort;
sendPort.send(sp)
sendPort.send("echo");
rp.listen((t) {
// 这里可以接收主线程发来的消息
});
}
2 Future
dart的事件驱动流程
前面我们说了dart中也是靠事件驱动(消息驱动)来推动整个程序的运行。我们都知道在Android中每个线程只有一个Looper和其对应的MessageQueue。但是在Dart中有两个队列:EventQueue(事件队列)和MicroTaskQueue(微任务队列):
整个程序的运行时靠Event-Loop先从MicroTaskQueue中取出任务执行,当MicroTaskQueue中没有任务了再去EventQueue中取任务执行。当EventQueue执行完取出的这一个普通任务之后就会重新去检查MicroTaskQueue中是否有新的任务。如果有就执行MicroTaskQueue中的任务,没有则继续EventQueue中的任务。也就是说MicroTaskQueue中的任务优先级是高于EventQueue中的,即MicroTaskQueue中的任务可以插队在EventQueue中的任务之前。
void main(){
var rp = ReceivePort();
rp.listen((t) {
print(t);
});
rp.sendPort.send("hello --- 1");
Future.microTask((){
print("micro task execute --- 1")
});
rp.sendPort.send("hello --- 2");
Future.microTask((){
print("micro task execute --- 2")
});
}
从执行结果来看微任务的优先级确实是要高于普通的任务,也跟我们的流程图上画的一样。
了解了dart的整个事件的驱动流程之后我们来认识一下Future:
Future的中文意思是未来,与其对应的就是java当中的FutureTask。在dart中就意味着使用Future创建的任务会在未来被执行。其实Future有很多的api,除了上面我们介绍过的Future.microTask()是向MicroTaskQueue中添加任务之外,其他的则是向EventQueue中添加。
void main(){
Future f = Future.delayed(Duration(seconds : 3));
f.then((t){
print(t);
});
}
是不是跟我们Android中的Handler很像?上面的代码表示延迟3秒之后会得到一个未来的Future对象并执行。但是不意味着3秒过后会立即执行,因为你不能确定Queue中是否有其他的任务,其他的任务是不是会阻塞。所以print(t)方法的执行事件是会在3秒或大于3秒之后执行。
void main(){
Future<String> f = new File("C:\user\test\a.txt").readAsString();
f.then((String text){
print(text);
});
}
上面的例子我们是从一个文件中读取内容。可以看到readAsString()返回的是一个带有明确泛型类型的Future,也就是把读取文件内容的这么一个任务放入了队列中。那么我们就可以通过then()方法在未来的某个时间点得到文件中的内容。那如果我们在读取的过程中出现了异常该怎么办呢?很简单,只需要在后面添加一个方法就可以了:
void main(){
Future<String> f = new File("C:\user\test\a.txt").readAsString();
f.then((String text){
print(text);
}).catchError((e,s){
// 捕获异常
});
}
通过查看Future的源码得知Future的then()方法返回的是一个Future对象,而then方法中我们传递的匿名方法是可以有返回值的。
void main(){
Future<String> f = new File("C:\user\test\a.txt").readAsString();
f.then((String text){
print(text);
return 6;
}).then((int i){
print(i);
});
}
可以看到当我们返回一个int类型的6时我们在调用then()方法的时候需要我们传入一个参数为int类型的匿名方法。如果不好理解可以拆开来理解:
Future<String> f = new File("C:\user\test\a.txt").readAsString();
Future<int> then = f.then((String text){
print(text);
return 6;
});
then.then((int i){
print(i);
})
// wait表示一组任务(我们传入了一个Future集合)执行完之后再做统一的一个处理
// 具体场景的话可以认为同时去请求几个接口,但是需要每个都成功返回之后再去做后面的处理
Future.wait([Future.delayed(Duration(seconds : 2), Future.delayed(Duration(seconds : 2)]).
then((t){
print("------")
});
// 按顺序执行
Future.foreach([1,2,3,4]), ((t){
print(t);
});
这其实就和Rxjava中的操作符一样,可以根据不同的需求做不同的事情。
3 Stream
Stream(流)在dart中也经常出现,表示发出一系列的异步数据。Stream是一个异步数据源,它是dart中处理异步事件流的统一api。
与Future只能表示一次在未来异步获得的数据,而Stream可以表示多次异步获得的数据。比如IO处理的时候,File的readAsString()是一次性读取整个文件的内容进来,而Stream可以一部分一部分的读取。如果文件过大,使用Future显然会出现内存问题。
单订阅模式
void main(){
// 返回一个Stream对象 泛型微List<int> 由于dart中没有byte类型,所以会用int来表示
Stream<List<int>> stream = new File("C:\user\test\a.txt").openRead();
// 返回一个订阅者
StreamSubscription subscriber = stream.listen((t){
print("stream"); // 这里会多次调用
});
}
在调用stream.listen()之后我们可以看到返回了一个订阅者StreamSubscription。那么既然是订阅者那么就对应的有取消订阅,任务完成等之类的回调:
subscriber.cancel(); // 取消
subscriber.onDone((){ //任务完成之后会调用此方法
});
subscriber.onData((t){ // 此方法中的匿名方法会完全替代listen中的匿名方法。也就是说listen中的匿名方法不会再调用
});
subscriber.pause(); // 暂停
subscriber.resume(); // 继续
还有比较多的方法,这里就不一一展示了。
多订阅模式
如果采用上面的用法,那么只能调用一次stream.listen(),再次调用的话运行就会报错。幸运的是Stream支持多订阅,就跟和被观察者一样可以被多个观察者订阅。
void main(){
Stream<List<int>> stream = new File("C:\user\test\a.txt").openRead();
// 转成广播模式
var broadcastStream = stream.asBroadcastStream();
broadcastStream.listen((t){
print("stream1");
});
broadcastStream.listen((t){
print("stream2");
});
broadcastStream.listen((t){
print("stream3");
});
}
上面的广播是由单订阅转换成广播的,其实在dart中可以直接创建广播:
void main(){
var controller = StreamController.broadcast();
//发送一个广播
controller.add("echo");
// controller.stream直接创建的就是广播
controller.stream.listen((t){
print(t);
});
}
运行之后很遗憾的是没有监听到数据。这是因为如果是这种创建的广播需要先订阅,才能接收数据。也就是说必须先订阅再接收,所以需要把发送和监听这两行代码调换一下位置。
void main(){
var stream = Stream.fromIterable([1,2,3]);
var broadcastStream = stream.asBroadcastStream();
broadcastStream.listen((t){
print("subscriber 1 : ${t}");
});
broadcastStream.listen((t){
print("subscriber 2 : ${t}");
});
// ----------------------
var controller = StreamController.broadcast();
controller.add("echo");
controller.stream.listen((t){
print("controller : ${t}");
});
// ----------------------
broadcastStream.listen((t){
print("subscriber 3 : ${t}");
});
}
上半部分是由单订阅的Stream转换成了多订阅的广播,中间部分则是和上一段代码一样直接创建了广播,最后则是由转换而来的广播又在controller这个直接创建的广播发送数据之后又去监听了一下。
根据运行结果我们可以看到转换的广播是可以收到controller发出的数据的。由此我们可以发现转换的广播有些类似于粘性广播,可以在广播发出之后再订阅依然可以收到数据。
3 async/await
使用async和await的代码是异步的,但是看起来却很像同步代码。前面学习Future的时候我们知道可以使用then()方法实现先执行完一个操作之后再执行另外一个操作。但是利用async和await可以很好的解决这种频繁会调的问题。
假设现在有一个需求要求我们按顺序的读取文件内容,就是A文件读完了再去读B文件,以此类推知道读完所有的文件。如果我们用Future该怎么实现呢?
void main(){
new File("xxx\xx\a.txt").readAsString()
.then((s){
return new File("xxx\xx\b.txt").readAsString();
}).then((s){
return new File("xxx\xx\c.txt").readAsString();
}).then......
}
可以看到我们需要不停的调用then来进行回调。虽然写起来也不是很麻烦,但是我们有更优雅的方式为什么不用呢?
Future<String> readFile() async {
String text1 = await new File("xxx\xx\a.txt").readAsString();
String text2 = await new File("xxx\xx\b.txt").readAsString();
String text3 = await new File("xxx\xx\c.txt").readAsString();
......
return "text${text2}${text3}......";
}
void main(){
readFile().then((String text){
print(text);
});
}
首先async和await是两个关键字。被async修饰的方法表示这是一个异步方法,返回值只能是void或者Future。await必须用在async方法中,表示转成同步执行(串行),也就是等待上一个执行完再执行下一个。
网友评论