美文网首页
@ngrx/effect 源码分析

@ngrx/effect 源码分析

作者: 风之化身呀 | 来源:发表于2020-07-05 22:38 被阅读0次

    前言

    @ngrx/effect 作为 Angular 副作用处理库,在实现上必然离不开 @ngrx/store,所以不清楚@ngrx/store 原理的请先读这篇文章了解下。

    正文1 - 依赖注入

      static forRoot(
        rootEffects: Type<any>[] = []
      ): ModuleWithProviders<EffectsRootModule> {
        return {
          ngModule: EffectsRootModule,
          providers: [
            {
              provide: _ROOT_EFFECTS_GUARD,
              useFactory: _provideForRootGuard,
              deps: [[EffectsRunner, new Optional(), new SkipSelf()]],
            },
            {
              provide: EFFECTS_ERROR_HANDLER,
              useValue: defaultEffectsErrorHandler,
            },
            // A
            EffectsRunner,
            // B
            EffectSources,
            // C
            Actions,
            // D1
            rootEffects,
            // D2
            {
              provide: _ROOT_EFFECTS,
              useValue: [rootEffects],
            },
            {
              provide: USER_PROVIDED_EFFECTS,
              multi: true,
              useValue: [],
            },
            // E
            {
              provide: ROOT_EFFECTS,
              useFactory: createEffects,
              deps: [Injector, _ROOT_EFFECTS, USER_PROVIDED_EFFECTS],
            },
          ],
        };
      }
    
    • A,EffectsRunner
      依赖 B(EffectSources),其 start 方法是将 effect 连接 store 的关键
    constructor(
        private effectSources: EffectSources,
        private store: Store<any>
      ) {}
    
      start() {
        if (!this.effectsSubscription) {
          this.effectsSubscription = this.effectSources
            .toActions()
            .subscribe(this.store);
        }
      }
    
    • B,EffectSources
      它有个 toActions 方法,用于将 effect流 转化为 action流供 store 订阅
    constructor(
        private errorHandler: ErrorHandler,
        @Inject(EFFECTS_ERROR_HANDLER)
        private effectsErrorHandler: EffectsErrorHandler
      ) {
        super();
      }
    
    • C,Actions
      注入了 ScannedActionsSubject 作为 source,从上篇文章可知每当store diapatch 一个 action 时,ScannedActionsSubject 都会收到这个 action,所以这个 Actions 实际上就是 store 里的 action 流
    constructor(@Inject(ScannedActionsSubject) source?: Observable<V>) {
        super();
        if (source) {
          this.source = source;
        }
      }
    
    • D1,D2都是获取输入的 effects
    • E,createEffects(injector,effects,user_effects)
    export function createEffects(
      injector: Injector,
      effectGroups: Type<any>[][],
      userProvidedEffectGroups: Type<any>[][]
    ): any[] {
      const mergedEffects: Type<any>[] = [];
      for (let effectGroup of effectGroups) {
        mergedEffects.push(...effectGroup);
      }
      for (let userProvidedEffectGroup of userProvidedEffectGroups) {
        mergedEffects.push(...userProvidedEffectGroup);
      }
      return createEffectInstances(injector, mergedEffects);
    }
    

    做了合并操作后,就开始创建 effect 实例了

    export function createEffectInstances(
      injector: Injector,
      effects: Type<any>[]
    ): any[] {
      return effects.map((effect) => injector.get(effect));
    }
    

    也就是说,E = [根的 effect 实例]
    到这里依赖注入部分就结束了。

    正文2 - EffectsRootModule

    constructor(
        private sources: EffectSources,
        runner: EffectsRunner,
        store: Store<any>,
        @Inject(ROOT_EFFECTS) rootEffects: any[],
        @Optional() storeRootModule: StoreRootModule,
        @Optional() storeFeatureModule: StoreFeatureModule,
        @Optional()
        @Inject(_ROOT_EFFECTS_GUARD)
        guard: any
      ) {
        runner.start();
        rootEffects.forEach(effectSourceInstance =>
          sources.addEffects(effectSourceInstance)
        );
        store.dispatch({ type: ROOT_EFFECTS_INIT });
      }
    
      addEffects(effectSourceInstance: any) {
        this.sources.addEffects(effectSourceInstance);
      }
    

    runner.start 方法十分关键,它将 effect 实例中每个通过 this.actions$.pipe() 创建的属性 observable 转为 action流并让 store 订阅

      start() {
        if (!this.effectsSubscription) {
          this.effectsSubscription = this.effectSources
            .toActions()
            .subscribe(this.store);
        }
      }
    

    toActions 方法为 effectSources 绑定了订阅者 store

    toActions(): Observable<Action> {
        return this.pipe(
          groupBy(getSourceForInstance),
          mergeMap((source$) => {
            return source$.pipe(groupBy(effectsInstance));
          }),
          mergeMap((source$) => {
            const effect$ = source$.pipe(
              exhaustMap((sourceInstance) => {
                return resolveEffectSource(
                  this.errorHandler,
                  this.effectsErrorHandler
                )(sourceInstance);
              }),
              map((output) => {
                reportInvalidActions(output, this.errorHandler);
                return output.notification;
              }),
              filter(
                (notification): notification is Notification<Action> =>
                  notification.kind === 'N'
              ),
              dematerialize()
            );
            // start the stream with an INIT action
            // do this only for the first Effect instance
            const init$ = source$.pipe(
              take(1),
              filter(isOnInitEffects),
              map((instance) => instance.ngrxOnInitEffects())
            );
            return merge(effect$, init$);
          })
        );
      }
    

    接下来的 sources.addEffects(effectSourceInstance) 则直接将所有effect 实例传给 effectSources,于是每个 effect 实例都会走一遍 toAction 方法。看下 resolveEffectSource

    function resolveEffectSource(
      errorHandler: ErrorHandler,
      effectsErrorHandler: EffectsErrorHandler
    ): (sourceInstance: any) => Observable<EffectNotification> {
      return (sourceInstance) => {
        const mergedEffects$ = mergeEffects(
          sourceInstance,
          errorHandler,
          effectsErrorHandler
        );
        if (isOnRunEffects(sourceInstance)) {
          return sourceInstance.ngrxOnRunEffects(mergedEffects$);
        }
        return mergedEffects$;
      };
    }
    
    export function mergeEffects(
      sourceInstance: any,
      globalErrorHandler: ErrorHandler,
      effectsErrorHandler: EffectsErrorHandler
    ): Observable<EffectNotification> {
      const sourceName = getSourceForInstance(sourceInstance).constructor.name;
    
      const observables$: Observable<any>[] = getSourceMetadata(sourceInstance).map(
        ({
          propertyName,
          dispatch,
          useEffectsErrorHandler,
        }): Observable<EffectNotification> => {
          const observable$: Observable<any> =
            typeof sourceInstance[propertyName] === 'function'
              ? sourceInstance[propertyName]()
              : sourceInstance[propertyName];
    
          const effectAction$ = useEffectsErrorHandler
            ? effectsErrorHandler(observable$, globalErrorHandler)
            : observable$;
    
          if (dispatch === false) {
            return effectAction$.pipe(ignoreElements());
          }
    
          const materialized$ = effectAction$.pipe(materialize());
    
          return materialized$.pipe(
            map(
              (notification: Notification<Action>): EffectNotification => ({
                effect: sourceInstance[propertyName],
                notification,
                propertyName,
                sourceName,
                sourceInstance,
              })
            )
          );
        }
      );
    
      return merge(...observables$);
    }
    

    mergeEffects 把 effect 中每个子 observable 都分离了出来,分别被 store 订阅,这就走通了整个流程了。首先 store dispatch 一个 action,接着 state 被 reducer 更新,然后 action 传给了 ScannedActionsSubject,然后 effect 这边就收到了该 action,然后 effct 里对该 action 感兴趣的子 observable 逻辑得到执行,一般会产生一个新 action,这个新 action 被 store 订阅了,于是又到了store dispatch 一个 action,形成闭环。当然这里要注意避免死循环,对不需要被 store 接受的 action 用 dispatch:false 处理

    正文3 - EffectsFeatureModule

    这个就比较好懂了

    constructor(
        root: EffectsRootModule,
        @Inject(FEATURE_EFFECTS) effectSourceGroups: any[][],
        @Optional() storeRootModule: StoreRootModule,
        @Optional() storeFeatureModule: StoreFeatureModule
      ) {
        effectSourceGroups.forEach((group) =>
          group.forEach((effectSourceInstance) =>
            root.addEffects(effectSourceInstance)
          )
        );
      }
    

    就是调用 EffectsRootModule 的 addEffects 方法将子 effect 加入到 EffectModule 系统中:

      addEffects(effectSourceInstance: any) {
        this.sources.addEffects(effectSourceInstance);
      }
    

    这就触发了 toActions 中的源 observable 了,接下来的逻辑和正文2一样了

    后序

    仓库中有一些技巧,如用装饰器 Effect 收集一些基本信息,等将来再使用。还有很多RXJS高级操作符的使用,很值得学习。

    相关文章

      网友评论

          本文标题:@ngrx/effect 源码分析

          本文链接:https://www.haomeiwen.com/subject/fdcbqktx.html