新建一个Angular应用,类型选择RxJS:

StackBlitz会自动生成应用模板:

源代码:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const source = of('World').pipe(
map(x => `Hello ${x}!`)
);
source.subscribe(console.log);
import { Observable } from "rxjs";
const stream$ = new Observable(subscriber => {
setTimeout(() => {
subscriber.next([1, 2, 3]);
}, 500);
setTimeout(() => {
subscriber.next({ a: 1000 });
}, 1000);
setTimeout(() => {
subscriber.next("end");
}, 3000);
setTimeout(() => {
subscriber.complete();
}, 4000);
});
// 启动流
class Subscriber {
constructor(private name: string) {}
complete = () => console.log("name: " + this.name + " done");
next = v =>
console.log("time: " + Date.now() + " name: " + this.name + " value: " + v);
error = () => console.log("error");
}
const s1 = new Subscriber("Jerry");
const s2 = new Subscriber("Tom");
stream$.subscribe(s1);
stream$.subscribe(s2);
运行输出:

Observable可以重复subscribe,互不干扰。
更多Jerry的原创文章,尽在:"汪子熙":

网友评论