美文网首页
Flutter throttle 和 debounce的实现

Flutter throttle 和 debounce的实现

作者: Codepgq | 来源:发表于2021-08-11 16:44 被阅读0次

    RxDart中截出

    新增文件,把下面代码导入即可

    import 'dart:async';
    import 'dart:collection';
    
    /// The strategy that is used to determine how and when a new window is created.
    enum WindowStrategy {
      /// cancels the open window (if any) and immediately opens a fresh one.
      everyEvent,
    
      /// waits until the current open window completes, then when the
      /// source [Stream] emits a next event, it opens a new window.
      eventAfterLastWindow,
    
      /// opens a recurring window right after the very first event on
      /// the source [Stream] is emitted.
      firstEventOnly,
    
      /// does not open any windows, rather all events are buffered and emitted
      /// whenever the handler triggers, after this trigger, the buffer is cleared.
      onHandler
    }
    
    class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
      final WindowStrategy _strategy;
      final Stream<dynamic> Function(S event) _windowStreamFactory;
      final T Function(S event) _onWindowStart;
      final T Function(List<S> queue) _onWindowEnd;
      final int _startBufferEvery;
      final bool Function(List<S> queue) _closeWindowWhen;
      final bool _ignoreEmptyWindows;
      final bool _dispatchOnClose;
      final Queue<S> queue = DoubleLinkedQueue<S>();
      final int maxLengthQueue;
      var skip = 0;
      var _hasData = false;
      var _mainClosed = false;
      StreamSubscription<dynamic> _windowSubscription;
    
      _BackpressureStreamSink(
          this._strategy,
          this._windowStreamFactory,
          this._onWindowStart,
          this._onWindowEnd,
          this._startBufferEvery,
          this._closeWindowWhen,
          this._ignoreEmptyWindows,
          this._dispatchOnClose,
          this.maxLengthQueue,
          );
    
      @override
      void add(EventSink<T> sink, S data) {
        _hasData = true;
        maybeCreateWindow(data, sink);
    
        if (skip == 0) {
          queue.add(data);
    
          if (maxLengthQueue != null && queue.length > maxLengthQueue) {
            queue.removeFirstElements(queue.length - maxLengthQueue);
          }
        }
    
        if (skip > 0) {
          skip--;
        }
    
        maybeCloseWindow(sink);
      }
    
      @override
      void addError(EventSink<T> sink, Object e, StackTrace st) =>
          sink.addError(e, st);
    
      @override
      void close(EventSink<T> sink) {
        _mainClosed = true;
    
        if (_strategy == WindowStrategy.eventAfterLastWindow) {
          return;
        }
    
        // treat the final event as a Window that opens
        // and immediately closes again
        if (_dispatchOnClose && queue.isNotEmpty) {
          resolveWindowStart(queue.last, sink);
        }
    
        resolveWindowEnd(sink, true);
    
        queue.clear();
    
        _windowSubscription?.cancel();
        sink.close();
      }
    
      @override
      FutureOr onCancel(EventSink<T> sink) => _windowSubscription?.cancel();
    
      @override
      void onListen(EventSink<T> sink) {}
    
      @override
      void onPause(EventSink<T> sink) => _windowSubscription?.pause();
    
      @override
      void onResume(EventSink<T> sink) => _windowSubscription?.resume();
    
      void maybeCreateWindow(S event, EventSink<T> sink) {
        switch (_strategy) {
        // for example throttle
          case WindowStrategy.eventAfterLastWindow:
            if (_windowSubscription != null) return;
    
            _windowSubscription = singleWindow(event, sink);
    
            resolveWindowStart(event, sink);
    
            break;
        // for example scan
          case WindowStrategy.firstEventOnly:
            if (_windowSubscription != null) return;
    
            _windowSubscription = multiWindow(event, sink);
    
            resolveWindowStart(event, sink);
    
            break;
        // for example debounce
          case WindowStrategy.everyEvent:
            _windowSubscription?.cancel();
    
            _windowSubscription = singleWindow(event, sink);
    
            resolveWindowStart(event, sink);
    
            break;
          case WindowStrategy.onHandler:
            break;
        }
      }
    
      void maybeCloseWindow(EventSink<T> sink) {
        if (_closeWindowWhen != null && _closeWindowWhen(unmodifiableQueue)) {
          resolveWindowEnd(sink);
        }
      }
    
      StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
          buildStream(event, sink).take(1).listen(
            null,
            onError: sink.addError,
            onDone: () => resolveWindowEnd(sink, _mainClosed),
          );
    
      // opens a new Window which is kept open until the main Stream
      // closes.
      StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
          buildStream(event, sink).listen(
                (dynamic _) => resolveWindowEnd(sink),
            onError: sink.addError,
            onDone: () => resolveWindowEnd(sink),
          );
    
      Stream<dynamic> buildStream(S event, EventSink<T> sink) {
        Stream stream;
    
        _windowSubscription?.cancel();
    
        stream = _windowStreamFactory(event);
    
        return stream;
      }
    
      void resolveWindowStart(S event, EventSink<T> sink) {
        if (_onWindowStart != null) {
          sink.add(_onWindowStart(event));
        }
      }
    
      void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
        if (isControllerClosing &&
            _strategy == WindowStrategy.eventAfterLastWindow) {
          if (_dispatchOnClose &&
              _hasData &&
              queue.length > 1 &&
              _onWindowEnd != null) {
            sink.add(_onWindowEnd(unmodifiableQueue));
          }
    
          queue.clear();
          _windowSubscription?.cancel();
          _windowSubscription = null;
    
          sink.close();
          return;
        }
    
        if (isControllerClosing ||
            _strategy == WindowStrategy.eventAfterLastWindow ||
            _strategy == WindowStrategy.everyEvent) {
          _windowSubscription?.cancel();
          _windowSubscription = null;
        }
    
        if (isControllerClosing && !_dispatchOnClose) {
          return;
        }
    
        if (_hasData && (queue.isNotEmpty || !_ignoreEmptyWindows)) {
          if (_onWindowEnd != null) {
            sink.add(_onWindowEnd(unmodifiableQueue));
          }
    
          // prepare the buffer for the next window.
          // by default, this is just a cleared buffer
          if (!isControllerClosing && _startBufferEvery > 0) {
            skip = _startBufferEvery > queue.length
                ? _startBufferEvery - queue.length
                : 0;
    
            // ...unless startBufferEvery is provided.
            // here we backtrack to the first event of the last buffer
            // and count forward using startBufferEvery until we reach
            // the next event.
            //
            // if the next event is found inside the current buffer,
            // then this event and any later events in the buffer
            // become the starting values of the next buffer.
            // if the next event is not yet available, then a skip
            // count is calculated.
            // this count will skip the next Future n-events.
            // when skip is reset to 0, then we start adding events
            // again into the new buffer.
            //
            // example:
            // startBufferEvery = 2
            // last buffer: [0, 1, 2, 3, 4]
            // 0 is the first event,
            // 2 is the n-th event
            // new buffer starts with [2, 3, 4]
            //
            // example:
            // startBufferEvery = 3
            // last buffer: [0, 1]
            // 0 is the first event,
            // the n-the event is not yet dispatched at this point
            // skip becomes 1
            // event 2 is skipped, skip becomes 0
            // event 3 is now added to the buffer
            if (_startBufferEvery < queue.length) {
              queue.removeFirstElements(_startBufferEvery);
            } else {
              queue.clear();
            }
          } else {
            queue.clear();
          }
        }
      }
    
      List<S> get unmodifiableQueue => List<S>.unmodifiable(queue);
    }
    
    /// A highly customizable [StreamTransformer] which can be configured
    /// to serve any of the common rx backpressure operators.
    ///
    /// The [StreamTransformer] works by creating windows, during which it
    /// buffers events to a [Queue].
    ///
    /// The [StreamTransformer] works by creating windows, during which it
    /// buffers events to a [Queue]. It uses a  [WindowStrategy] to determine
    /// how and when a new window is created.
    ///
    /// onWindowStart and onWindowEnd are handlers that fire when a window
    /// opens and closes, right before emitting the transformed event.
    ///
    /// startBufferEvery allows to skip events coming from the source [Stream].
    ///
    /// ignoreEmptyWindows can be set to true, to allow events to be emitted
    /// at the end of a window, even if the current buffer is empty.
    /// If the buffer is empty, then an empty [List] will be emitted.
    /// If false, then nothing is emitted on an empty buffer.
    ///
    /// dispatchOnClose will cause the remaining values in the buffer to be
    /// emitted when the source [Stream] closes.
    /// When false, the remaining buffer is discarded on close.
    class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
      /// Determines how the window is created
      final WindowStrategy strategy;
    
      /// Factory method used to create the [Stream] which will be buffered
      final Stream<dynamic> Function(S event) windowStreamFactory;
    
      /// Handler which fires when the window opens
      final T Function(S event) onWindowStart;
    
      /// Handler which fires when the window closes
      final T Function(List<S> queue) onWindowEnd;
    
      /// Maximum length of the buffer.
      /// Specify this value to avoid running out of memory when adding too many events to the buffer.
      /// If it's `null`, maximum length of the buffer is unlimited.
      final int maxLengthQueue;
    
      /// Used to skip an amount of events
      final int startBufferEvery;
    
      /// Predicate which determines when the current window should close
      final bool Function(List<S> queue) closeWindowWhen;
    
      /// Toggle to prevent, or allow windows that contain
      /// no events to be dispatched
      final bool ignoreEmptyWindows;
    
      /// Toggle to prevent, or allow the final set of events to be dispatched
      /// when the source [Stream] closes
      final bool dispatchOnClose;
    
      /// Constructs a [StreamTransformer] which buffers events emitted by the
      /// [Stream] that is created by [windowStreamFactory].
      ///
      /// Use the various optional parameters to precisely determine how and when
      /// this buffer should be created.
      ///
      /// For more info on the parameters, see [BackpressureStreamTransformer],
      /// or see the various back pressure [StreamTransformer]s for examples.
      BackpressureStreamTransformer(
          this.strategy,
          this.windowStreamFactory, {
            this.onWindowStart,
            this.onWindowEnd,
            this.startBufferEvery = 0,
            this.closeWindowWhen,
            this.ignoreEmptyWindows = true,
            this.dispatchOnClose = true,
            this.maxLengthQueue,
          });
    
      @override
      Stream<T> bind(Stream<S> stream) {
        final sink = _BackpressureStreamSink(
          strategy,
          windowStreamFactory,
          onWindowStart,
          onWindowEnd,
          startBufferEvery,
          closeWindowWhen,
          ignoreEmptyWindows,
          dispatchOnClose,
          maxLengthQueue,
        );
        return forwardStream(stream, sink);
      }
    }
    
    extension _RemoveFirstNQueueExtension<T> on Queue<T> {
      /// Removes the first [count] elements of this queue.
      void removeFirstElements(int count) {
        for (var i = 0; i < count; i++) {
          removeFirst();
        }
      }
    }
    
    
    class ThrottleStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
      /// Construct a [StreamTransformer] that emits a value from the source [Stream],
      /// then ignores subsequent source values while the window [Stream] is open,
      /// then repeats this process.
      ///
      /// If [leading] is true, then the first item in each window is emitted.
      /// If [trailing] is true, then the last item in each window is emitted.
      ThrottleStreamTransformer(
          Stream Function(T event) window, {
            bool trailing = false,
            bool leading = true,
          }) : super(
        WindowStrategy.eventAfterLastWindow,
        window,
        onWindowStart: leading ? (event) => event : null,
        onWindowEnd: trailing ? (queue) => queue.last : null,
        dispatchOnClose: trailing,
        maxLengthQueue: trailing ? 2 : 0,
      );
    }
    
    class TimerStream<T> extends Stream<T> {
      final StreamController<T> _controller;
    
      /// Constructs a [Stream] which emits [value] after the specified [Duration].
      TimerStream(T value, Duration duration)
          : _controller = _buildController(value, duration);
    
      @override
      StreamSubscription<T> listen(void Function(T event) onData,
          {Function onError, void Function() onDone, bool cancelOnError}) {
        return _controller.stream.listen(
          onData,
          onError: onError,
          onDone: onDone,
          cancelOnError: cancelOnError,
        );
      }
    
      static StreamController<T> _buildController<T>(T value, Duration duration) {
        final watch = Stopwatch();
        Timer timer;
         StreamController<T> controller;
        Duration totalElapsed = Duration.zero;
    
        void onResume() {
          // Already cancelled or is not paused.
          if (totalElapsed == null || timer != null) return;
    
          totalElapsed = totalElapsed + watch.elapsed;
          watch.start();
    
          timer = Timer(duration - totalElapsed, () {
            controller.add(value);
            controller.close();
          });
        }
    
        controller = StreamController(
          sync: true,
          onListen: () {
            watch.start();
            timer = Timer(duration, () {
              controller.add(value);
              controller.close();
            });
          },
          onPause: () {
            timer?.cancel();
            timer = null;
            watch.stop();
          },
          onResume: onResume,
          onCancel: () {
            timer?.cancel();
            timer = null;
            totalElapsed = null;
          },
        );
        return controller;
      }
    }
    
    abstract class ForwardingSink<T, R> {
      /// Handle data event
      void add(EventSink<R> sink, T data);
    
      /// Handle error event
      void addError(EventSink<R> sink, Object error, StackTrace st);
    
      /// Handle close event
      void close(EventSink<R> sink);
    
      /// Fires when a listener subscribes on the underlying [Stream].
      void onListen(EventSink<R> sink);
    
      /// Fires when a subscriber pauses.
      void onPause(EventSink<R> sink);
    
      /// Fires when a subscriber resumes after a pause.
      void onResume(EventSink<R> sink);
    
      /// Fires when a subscriber cancels.
      FutureOr onCancel(EventSink<R> sink);
    }
    
    /// @private
    /// Helper method which forwards the events from an incoming [Stream]
    /// to a new [StreamController].
    /// It captures events such as onListen, onPause, onResume and onCancel,
    /// which can be used in pair with a [ForwardingSink]
    Stream<R> forwardStream<T, R>(
        Stream<T> stream, ForwardingSink<T, R> connectedSink) {
      ArgumentError.checkNotNull(stream, 'stream');
      ArgumentError.checkNotNull(connectedSink, 'connectedSink');
    
      StreamController<R> controller;
      StreamSubscription<T> subscription;
    
      @pragma('vm:prefer-inline')
      @pragma('dart2js:tryInline')
      void runCatching(void Function() block) {
        try {
          block();
        } catch (e, s) {
          connectedSink.addError(controller, e, s);
        }
      }
    
      final onListen = () {
        runCatching(() => connectedSink.onListen(controller));
    
        subscription = stream.listen(
              (data) => runCatching(() => connectedSink.add(controller, data)),
          onError: (Object e, StackTrace st) =>
              runCatching(() => connectedSink.addError(controller, e, st)),
          onDone: () => runCatching(() => connectedSink.close(controller)),
        );
      };
    
      final onCancel = () {
        final onCancelSelfFuture = subscription.cancel();
        final onCancelConnectedFuture = connectedSink.onCancel(controller);
        final futures = <Future>[
          if (onCancelSelfFuture is Future) onCancelSelfFuture,
          if (onCancelConnectedFuture is Future) onCancelConnectedFuture,
        ];
        return Future.wait<dynamic>(futures);
      };
    
      final onPause = () {
        subscription.pause();
        runCatching(() => connectedSink.onPause(controller));
      };
    
      final onResume = () {
        subscription.resume();
        runCatching(() => connectedSink.onResume(controller));
      };
    
      // Create a new Controller, which will serve as a trampoline for
      // forwarded events.
      if (stream.isBroadcast) {
        controller = StreamController<R>.broadcast(
          onListen: onListen,
          onCancel: onCancel,
          sync: true,
        );
      } else {
        controller = StreamController<R>(
          onListen: onListen,
          onPause: onPause,
          onResume: onResume,
          onCancel: onCancel,
          sync: true,
        );
      }
    
      return controller.stream;
    }
    
    class DebounceStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
      /// Constructs a [StreamTransformer] which buffers events into a [List] and
      /// emits this [List] whenever the current [window] fires.
      ///
      /// The [window] is reset whenever the [Stream] that is being transformed
      /// emits an event.
      DebounceStreamTransformer(Stream Function(T event) window)
          : super(
        WindowStrategy.everyEvent,
        window,
        onWindowEnd: (Iterable<T> queue) => queue.last,
        maxLengthQueue: 1,
      );
    }
    
    extension Throttle<T> on Stream<T> {
      Stream<T> throttle(Duration duration,{bool trailing = false, bool leading = true}) {
        return transform(ThrottleStreamTransformer<T>(
              (_) => TimerStream<bool>(true, duration),
          trailing: trailing,
          leading: leading,
        ),);
      }
    
      Stream<T> debounce(Duration duration) => transform(
          DebounceStreamTransformer<T>((_) => TimerStream<void>(null, duration)));
    }
    
    

    相关文章

      网友评论

          本文标题:Flutter throttle 和 debounce的实现

          本文链接:https://www.haomeiwen.com/subject/klkgsltx.html