1. streams 重点
1). streams 提供异步数据序列
2). 数据序列包括用户生成的事件和从文件读取的数据
3). 可以通过 Stream API 的 await for 和 listener 处理流
4). streams 提供了一种响应错误的方法
5). streams 分为单订阅和广播两类
Dart 中的异步编程的特性是 Future和Stream类。
-
Future 表示计算无法立即完成. 在普通函数返回结果的地方,异步函数返回Future,最终将包含结果。当结果准备好时Future 会告诉你。
-
streams 是一系列的异步事件. 它像是一个异步的Iterable,stream 告诉你准备就绪时有一个事件,而不是在你请求时获取下一个事件。
2. 接收流事件
/// 运行
run() {
// 获取计数
var stream = countStream(10);
// 计算和
var sum = sumStream(stream);
print(sum);
}
/// 创建流
/// @return async* 返回一个 Iterable
Stream<int> countStream(int num) async* {
for (int i = 0; i <= num; i++) {
yield i;
}
}
/// 计算和
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (var value in stream) {
sum += value;
}
return sum;
}
/// 主函数
main(List<String> arguments) {
run();
}
其中,yield 产生一个数据,await 会等待数据产生, async* 返回 Iterable 数据, async 返回一个数据。countStream每产生一个数据,sumStream就会计算一次求和。
3. 异常事件
当流中没有更多事件时,流就完成了,并通知接收事件的代码。使用await for循环读取事件时,流完成后停止循环。
在某些情况下,流完成之前会发生错误;从远程服务器获取文件时网络可能失败,或者创建事件的代码存在错误,但是有些人可能需要知道它。
流还可以传递错误事件,就像传递数据事件一样。大多数流将在出现第一个错误后停止,但是有可能传递多个错误的流以及在发生错误事件后传递更多数据的流。
当使用await for读取流时,循环语句会引发错误,也结束了循环。我们可以使用try-catch捕获错误。
import 'dart:async';
/// 运行
main() async {
// 获取计数
var stream = countStream(10);
// 计算和
var sum = await sumStream(stream);
print(sum);
}
/// 创建流
/// @return async* 返回一个 Iterable
Stream<int> countStream(int num) async* {
for (int i = 0; i <= num; i++) {
if (i == 4) {
// 抛出异常
throw Exception('Intentional Exception');
}
yield i;
}
}
/// 计算和
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
// 捕获异常
try {
// 累加
await for (var value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
其中在countStream中循环 i == 4 的时候抛出异常,并在sumStream的外层捕获异常。异常事件流处理视频
4. 工作流
Stream 类包含许多辅助方法,可以使用lastWhere()Stream API 查找流中的最后一个正整数。
main() async {
// 获取流
var stream = countStream(10);
// 获取最后一个大于 0 的数
var num = await lastPositive(stream);
print(num);
}
/// 创建流
/// @return async* 返回一个 Iterable
Stream<int> countStream(int num) async* {
for (int i = 0; i < num; i++) {
yield i;
}
}
/// 获取最后一个大于 0 的数
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x > 0);
常用方法:
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
5. 两种流
- 单订阅流
包含一系列的流事件,这些事件必须以正确的顺序传递,并且不能丢失任何事件。读取文件或接收 Web 响应的获得的流,这样的流只能被获取一次。
import 'dart:async';
main() {
// 流控制器
StreamController controller = StreamController();
// 监听流, 只实现默认方法
// controller.stream.listen((data) => print(data));
// 监听流, 全部实现
controller.stream.listen((data) => print(data),
onError: onError, onDone: onDone, cancelOnError: true);
// 添加数据
controller.sink.add(0);
controller.sink.add('a, b, c, d');
// 添加异常, 可选
controller.sink.addError(-1);
controller.sink.add(3.14);
// 关闭流
controller.close();
}
/// 异常
void onError(error) {
print('onError: $error');
}
/// 完成
void onDone() {
print('onDone');
}
注意: 我们在控制流的时候使用StreamController, 属性stream可以对流进行处理,属性 sink可以添加数据等。stream 的 listener 方法中onData 方法是必须传入的,有三个可选参数onError、onDone、cancelOnError。
- 广播流
另一种流是针对可以一次处理的单个消息的。例如,这种流可用于浏览器中的鼠标事件。我们可以随时开始收听这样的流,并且在收听时会触发事件。多个收听者可以同时收听,可以在取消上一个订阅后稍后再次收听。
import 'dart:async';
main() {
// 广播流
StreamController controller = StreamController.broadcast();
// 添加广播1
controller.stream.listen((value) => print("listener1: $value"));
// 添加数据
controller.sink.add(1);
controller.sink.add(2);
// 添加广播2
controller.stream.listen((value) => print("listener2: $value"));
controller.sink.add(3);
controller.sink.add(4);
// 关闭
controller.close();
}
**注意: ** broadcast方法可使控制器中的流为广播流
6. 流的转换
import 'dart:async';
main() {
// 创建流控制器
StreamController<int> controller = StreamController<int>();
// 创建转换器
final transformer = StreamTransformer<int, String>.fromHandlers(
handleData: (value, sink) {
if(value == 100) {
sink.add("您猜对了!");
} else {
sink.addError("还没猜中请继续猜!");
}
}
);
// 监听
controller.stream
.take(2) // 控制最大次数
.transform(transformer)
.listen((data)=> print(data), onError: (err) => print(err));
// 添加数据, 添加五次
controller.sink.add(1);
controller.sink.add(50);
controller.sink.add(70);
controller.sink.add(80);
controller.sink.add(100);
}
打印结果:

网友评论