美文网首页
反应式编程 .Net平台的有Reactive Extension

反应式编程 .Net平台的有Reactive Extension

作者: OMG_1bcc | 来源:发表于2019-01-10 15:59 被阅读0次

    文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
    支持的平台
    Rx 4.1支持以下平台
    .NET Framework 4.6+
    .NET Standard 2.0+(包括.NET Core,Xamarin等)
    .UWP
    在从v2.xx迁移到v3.0.0时,NuGet包已更改其包命名

    • Rx-Main 就是现在 [System.Reactive]
    • Rx-Core 就是现在 [System.Reactive.Core]
    • Rx-Interfaces 就是现在 [System.Reactive.Interfaces]
    • Rx-Linq 就是现在 [System.Reactive.Linq]
    • Rx-PlatformServices 就是现在 [System.Reactive.PlatformServices]
    • Rx-Testing 就是现在 [Microsoft.Reactive.Testing]

    响应式编程包含:一个观察者(observer)订阅可观察到的对象(Observable)。可观察到的对象(Observable)通过调用观察者的方法来发射项目或通知给它的所有观察者(observer)。观察者有些时候也被称作是订阅者,观看者,响应者。

    关于Observers的创建:
    下面是Observable的生成方法列表
    方法名 功能介绍
    Create 生成任意的Observable
    Defer 推迟执行直到被订阅为止
    Empty 只执行 OnCompleted 方法
    FromAsyncPattern 生成 Begin-End Pattern
    FromEvent *4 从 Action 代理的事件生成
    FromEventPattern 从 EventHandler 代理的事件生成
    Generate 模拟for推送值
    Interval 一段时间推送一个值
    Never 什么也不做
    Range 推送一个区间内的值
    Repeat 指定次数推送同一个值
    Return 只返回一个结果
    Start 指定一个Schedula立即执行,完成后返回一个值
    Throw 只执行 OnError
    Timer 在某个时间推送
    ToAsync 返回一个 Func<Observable<T>>
    Using 完成后,生成将指定的资源Dispose掉的代理

    Observable对象创建完了之后,我们需要注册。注册可能不在UI线程中,如果这样,当我们试图往UI线程中写入东西时,就会抛出异常。因此确保在UI线程中注册Observable对象显得尤为重要。我们可以通过ObserveOn(Deployment.Current.Dispatcher),这个方法实现。该方法保证我们在UI线程中注册Observable对象。
    实例:
    var browser = Observable.FromEvent<NavigationEventArgs>(webBrowser1, "Navigated");
    browser.ObserveOn(Deployment.Current.Dispatcher).Subscribe(evt =>
    {
    lblProgress.Visibility = Visibility.Collapsed;
    }
    );
    订阅方法就是展示了observer如何连接到Observable。observer实现了下列方法的一些子集:
    onNext
    每当Observable广播数据时将会调用该方法。这个方法将会被作为Observable的一个广播项目参数被发送
    onError
    Observable调用此方法表示它内部已经发生异常数据或者发生一些其他错误。这样停止观察,并且也不会做将来的调用onNext或者onCompleted。该onError方法作为它的参数来指示了错误的原因。
    onCompleted
    Observable在已经调用了onNext方法作为最后的时间,如果没有遇到任何错误,那么该方法将会被调用
    通过Observable的定义,它可能调用onNext零次或者很多次,并且接下来的调用可能是onCompleted或者onError方法,但是不是同时调用,这都是最终才会被调用。在调用过程中,onNext通常称作任务的执行,而onCompleted或者onError被称作任务的结果通知
    实例代码:
    // 正常结束的时候
    // 运行结果:1, 2, 3, 4, 5, Completed
    Observable.Range(1, 5)
    .Subscribe(
    x => Console.WriteLine(x),
    ex => Console.WriteLine("Error"),
    () => Console.WriteLine("Completed"));

    // 中间发生异常的时候
    // 运行结果:1, 2, Error
    Observable.Range(1, 5)
    .Do(x => { if (x == 3) throw new Exception(); })
    .Subscribe(
    x => Console.WriteLine(x),
    ex => Console.WriteLine("Error"),
    () => Console.WriteLine("Completed"));

    Dispose的必要性
    Subscribe方法的返回值是一个实现了 IDisposable 接口的对象,在上面的示例中中间变量被忽略了。关于Rx的IDisposable对象与一般意义上的“有可能是必须释放的资源”是有所不同的。如果 Rx 处理的是事件的时候,那么 Dispose 表示“分离”,Timer的时候则表示“中止”,异步的时候是“取消”的意思。
    用Rx来处理事件的优势
    一、事件合成
    下面的代码示例如何将 按下/移动/放开 合成一个新的事件并进行处理:
    // WindowsForm的Drag事件:鼠标左键按下/移动/直到放开过程中
    // 取得鼠标的坐标
    var drag = from down in this.MouseDownAsObservable()
    from move in this.MouseMoveAsObservable().TakeUntil(
    this.MouseUpAsObservable())
    select move.Location;
    MouseDownAsObservable、MouseMoveAsObservable、MouseUpAsObservable方法是用Observable类的FromEvent静态方法包装的扩张方法。
    二、Timer/通知事件
    在一定的时间间隔监视某个值的场景。Rx将Timer变为一个序列,另外通过Select方法可以灵活变换输出。这些组合在一起,监视对象的值在一定时间间隔内会自动推送过来,非常容易操作。而且过滤值的处理也很简单,可以只在值发生变化时再接收这样就更简单了。

    // 每隔1秒监视一下watchTarget.Value的值
    var polling =
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
    .Select(_ => watchTarget.Value)
    .DistinctUntilChanged(); // 只有在值发生变化时才引发事件(polling)
    ②// FileSystemWatcher的Changed事件
    // 发生一次变化,会触发多个事件
    var watcher =
    new FileSystemWatcher("C:\", "test.txt")
    { EnableRaisingEvents = true };
    // "对于1秒内连续发生的事件,进行过滤,只处理最后一个"
    // 变成一个相对更容易处理的对象
    var changed =
    Observable.FromEventPattern<FileSystemEventArgs>(
    watcher, "Changed")
    .Throttle(TimeSpan.FromSeconds(1)); // Throttle方法是只允许通过指定时间和指定值的内容
    三、FromEvent方法和FromEventPattern方法
    用Rx处理事件,需要用 FromEvent 方法或者 FromEventPattern 方法将事件变为 IObservable<T> 对象。FromEvent 方法可以转换 Action<T> 代理,序列元素则为 T 。FromEventPattern 方法可以转换 EventHandler 代理,序列元素则为 EventPattern<TEventArgs>,它包装了 Object 类型的 sender 和 TEventArgs 类型的 e。
    ①通过反射方式
    // 按钮控件的Click事件 Rx 化
    Observable.FromEventPattern<RoutedEventArgs>(button1, "Click");
    ②非反射方式,性能更加优化
    // 第一个参数是事件响应处理的代理(这里 h => h.Invoke 是固定)
    // 第二个参数绑定事件,第三个参数解除事件
    Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
    h => h.Invoke,
    h => button1.Click += h, h => button1.Click -= h);
    在进一步优化代码结构
    指定事件的处理,还有指定的字符串,这样的编码会比较多,实际运用中这样会影响代码的维护性。因此推荐将这些代码作为扩展方法分离出去。
    public static class ButtonBaseExtensions
    {
    // 分离的扩展方法
    public static IObservable<RoutedEventArgs> ClickAsObservable(this ButtonBase button)
    {
    return Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
    h => h.Invoke,
    h => button.Click += h, h => button.Click -= h)
    .Select(x => x.EventArgs);
    / 使用FromEvent生成EventPattern对象
    // 用Select省略sender
    return Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
    h => (sender, e) => h(e),
    h => button.Click += h, h => button.Click -= h);
    }
    }
    // 实际运用时调用
    button1.ClickAsObservable().Subscribe(_ => MessageBox.Show("Clicked!"));

    如果sender参数是必要的时候,也可以通过 Select 重新包装来做。比如:
    button1.ClickAsObservable().Select(ev => new { Sender = button1, EventArgs = ev })

    Rx代表性的方法:
    SelectMany 方法
    SelectMany 方法是 Rx 中最常用的方法之一。从第一个异步结果中启动第2个异步处理,这对于使用Rx进行异步编程是非常重要的。
    SelectMany的处理图
    根据 A 序列的值,后续用 B 序列的值进行插入替换。
    // 替换别的Observable的内容
    // 结果:10, 10, 11, 10, 11, 12
    Observable.Range(1, 3)
    .SelectMany(x => Observable.Range(10, x))
    .Subscribe(Console.WriteLine);

    // 实际的替换过程
    // { x = 1, y = 10 }
    // { x = 2, y = 10 }
    // { x = 2, y = 11 }
    // { x = 3, y = 10 }
    // { x = 3, y = 11 }
    // { x = 3, y = 12 }
    var query = from x in Observable.Range(1, 3)
    from y in Observable.Range(10, x)
    select new { x, y };
    query.Subscribe(Console.WriteLine);
    ○ Concat方法
    Concat 是将2个序列进行连接的方法。这个时候,直到第一个序列终止前,第二个序列的值就会被忽略掉。我们可以理解是在第一个序列的结尾追加上另一个序列。
    // 运行结果:1, 2, 3, -1, -1, -1
    Observable.Range(1, 3)
    .Concat(Observable.Repeat(-1, 3))
    .Subscribe(Console.WriteLine);
    ○ Merge 方法
    Merge会将所有的值都会合并进来。如果要对应多个控件的共通处理的话,使用Merge是很方便的。
    // WindowsForm中的4个TextBox控件全部设定为:
    // “DragDropEffects.All”
    new[] { textBox1, textBox2, textBox3, textBox4 }
    .Select(x => Observable.FromEventPattern<DragEventArgs>(x, "DragEnter"))
    .Merge()
    .Subscribe(x => x.EventArgs.Effect = DragDropEffects.All);

    // 上面的Merge方法是下面的代码的变形,
    // 修改为:IEnumerable<IObservable<T>>进行Merge
    // 代码变得更简洁
    Observable.Merge(
    Observable.FromEventPattern<DragEventArgs>(textBox1, "DragEnter"),
    Observable.FromEventPattern<DragEventArgs>(textBox2, "DragEnter"),
    Observable.FromEventPattern<DragEventArgs>(textBox3, "DragEnter"),
    Observable.FromEventPattern<DragEventArgs>(textBox4, "DragEnter")
    );
    ○ Zip 方法
    Zip方法是A和B中各取1个值为一组(2个值)进行配对处理。一边的值如果发生偏移,那么Zip会直到取到2个值为止才输出。
    如下代码所示,使用Zip方法将 Interval 方法(指定时间间隔发行值)和Timestamp(实际时刻)进行组合的结果。
    // 結果:
    // { x = 0@2011/12/20 7:37:15 +09:00, y = 0@2011/12/20 7:37:17 +09:00, now = 2011/12/20 7:37:17 +09:00 }
    Observable.Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Zip(Observable.Interval(TimeSpan.FromSeconds(3)).Timestamp(), (x, y) => new { x, y, now = DateTimeOffset.Now })
    .Subscribe(Console.WriteLine);
    ○ CombineLatest 方法
    类似 Zip 方法,两边引发“值”时,取得最新的值输出。如下图所示,两边都引发事件,且需要对两边的事件都需要处理的场景:
    A和B的序列,任何一边在引发变化时都会取出两边最新的值输出。

    public static class ToggleButtonExtensions
    {
    // WPF/Silverlight/WP7のToggleButton控件(Checkbox)
    // 如果Check状态变化 IsChecked属性值也跟着变化
    public static IObservable<bool> IsCheckedAsObservable(this ToggleButton button)
    {
    var checkedAsObservable = Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
    h => (sender, e) => h(e),
    h => button.Checked += h, h => button.Checked -= h);

    var uncheckedAsObservable = Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
      h => (sender, e) => h(e),
      h => button.Unchecked += h, h => button.Unchecked -= h);
    
    return Observable.Merge(checkedAsObservable, uncheckedAsObservable).Select(_ => button.IsChecked.Value);
    

    }
    }

    // checkBox1和checkBox2两个CheckBox
    // 同时选中时,MessageBox才表示。
    checkBox1.IsCheckedAsObservable()
    .CombineLatest(checkBox2.IsCheckedAsObservable(),
    (isChecked1, isChecked2) => new { isChecked1, isChecked2 })
    .Where(x => x.isChecked1 && x.isChecked2)
    .Subscribe(_ => MessageBox.Show("同时选择!"));
    ○ Scan方法
    最后,这个Scan方法不是连接方法而是一个集计的方法。Scan方法是1个前面的“结果”和现在的“值”进行合成输出的。因为可以获得1个前面的结果值,所以进行差分(累计)计算时使用比较方便。如下图所示:
    A序列中,1个前面的“结果”(中间褐色的横线)和当前的“值”(上面蓝色的横线)进行合成。
    Scan 方法就像 Linq2Object 中的 Aggregate 方法在计算时,列举的全部中间结果
    // 1, 3(=1+2), 6(=3+3), 10(=6+4), 15(=10+5)
    Observable.Range(1, 5)
    .Scan((x, y) => x + y)
    .Subscribe(Console.WriteLine);

    ReactiveUI库
    ReactiveUI类库是实现了MVVM模式的框架,他移除了一些Rx和用户界面进行交互的代码。ReactiveUI的核心思想是使开发者能够将属性变更以及事件转换为IObservable对象,然后在需要的时候使用IObservable对象将这些对象转换到属性中来。他的另一个核心目标是可以在ViewModel中相关属性发生变化时可以可执行相应的命令。虽然其他的框架也允许这么做,但是ReactiveUI会在依赖属性变更时自动的去更新结果,而不需要通过拉或者调用类似UpdateTheUI之类的方法

    核心类
    ReactiveObject:它是ViewModel对象,该对象实现了INotifyPropertyChanged接口。除此之外,该对象也提供了一个称之为Changed的IObservable接口,允许其他对象来注册,从而使得该对象属性变更时能够得到通知。使用Rx中强大的操作符,我们还可以追踪到一些状态是如何改变的。
    ReactiveValidateObject:该对象继承自ReactiveObject对象,它通过实现IDataErrorInfo接口,利用DataAnnotations来验证对象。因此属性的值可以使用一些限制标记,UI界面能够自动的在属性的值违反这些限制时显示出这些错误。
    ObservableAsPropertyHelper<T>:该类可以很容易的将IObservable对想转换为一个属性,该属性存储该对象的最新值,并且在属性值发生改变时能够触发NofityPropertyChanged事件。使用该类,我们能够从IObservable中派生出一些新的属性。
    ReactiveCommand:该类实现了ICommand和IObservable接口,并且当Execute执行时OnNext方法就会被执行。该对象的CanExecute可以通过IObservable<bool>来定义。
    ReactiveAsyncCommand:该对象继承自ReactiveCommand,并且封装了一种通用的模式。即“触发一步命令,然后将结果封送到dispather线程中”该对象也允许设置最大并行值。当达到最大值时,CanExecute方法返回false。

    使用ReactiveObject实现ViewModels
    在ReactiveObject中,属性的命名也需要注意,用作属性的私有字段必须为属性名称前面加上下划线。下面的例子展示了如何使用ReactiveObject声明一个可读写的属性。
    public class AppViewModel : ReactiveObject
    {
    int _SomeProp;
    public int SomeProp
    {
    get { return _SomeProp; }
    set { this.RaiseAndSetIfChanged(x => x.SomeProp, value); }
    }
    }
    ReactiveUI能够很容易的通过名为Changed的IObservable接口注册事件变化。在任何一个属性发生变化时,都会触发通知,客户端通常只需要关注感兴趣的一两个变化了的属性。使用ReactiveUI,可以通过WhenAny扩展方法很容易的获取这些属性值:
    var newLoginVm = new NewUserLoginViewModel();

    newLoginVm.WhenAny(x => x.User, x => x.Value)
    .Where(x => x.Name == "Bob")
    .Subscribe(x => MessageBox.Show("Bob is already a user!"));

    IObservable<bool> passwordIsValid = newLoginVm.WhenAny(
    x => x.Password, x => x.PasswordConfirm,
    (pass, passConf) => (pass.Value == passConf.Value));

    ReactiveCommand
    ReactiveCommand实现了ICommand接口,他可以模拟简单的ICommand实现。我们可以将它看做是一种ICommand,可以使用Create静态方法创建。
    var cmd = ReactiveCommand.Create(x => true, x => Console.WriteLine(x));
    cmd.CanExecute(null); //方法输出true
    cmd.CanExecute("Hello"); //方法输出"Hello"
    下面构造了一个Command,该Command只在鼠标松开时触发。

    var mouseIsUp = Observable.Merge(
    Observable.FromEvent<MouseButtonEventArgs>(window, "MouseDown").Select(_ => false),
    Observable.FromEvent<MouseButtonEventArgs>(window, "MouseUp").Select(_ => true))
    .StartWith(true);
    var cmd = new ReactiveCommand(mouseIsUp);
    cmd.Subscribe(x => Console.WriteLine(x));

    使用ReactiveAsyncCommand处理异步方法调用
    由于ReactiveAsyncCommand直接继承自ReactiveCommand,所以它能做基类的所有功能。使用Execute,使得Command开始在后台执行时并可以通知用户。ReactiveAsyncCommand和ReactiveCommand不同之处在于,它内建了能够自动跟踪后台线程中运行的任务的数量。ReactiveAsyncCommand对象中是使用RegisterAsyncAction来注册异步执行操作的。它能够注册异步方法和同步方法,这些方法将会在后台线程中执行,并返回IObservable数据表示执行结果会在未来的某一时刻到来。IObservale通常对应Command调用。每一次执行Execute方法将会将结果存入到IObservable对象中。
    下面是一个简单的使用Command的例子,它在后台线程的Task中运行,并且只运行一次。
    var cmd = new ReactiveAsyncCommand();
    cmd.RegisterAsyncAction(i => {
    Thread.Sleep((int)i * 1000);
    });

    cmd.Execute(5);
    cmd.CanExecute(5);//False

    构造一个ViewModel例子
    ①View
    <Window x:Class="RxUI.MainWindow"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    Title="MainWindow" Height="350" Width="525" x:Name="Window">
    <Grid DataContext="{Binding ViewModel, ElementName=Window}">
    <StackPanel HorizontalAlignment="Center" VerticalAlignment="Center">
    <TextBlock Text="{Binding DataFromTheInternet}" FontSize="18"/>
    <Button Content="Click me!" Command="{Binding GetDataFromTheInternet}"
    CommandParameter="5" MinWidth="75" Margin="0,6,0,0"/>
    </StackPanel>
    </Grid>
    </Window>

    public partial class MainWindow : Window
    {
    public AppViewModel ViewModel { get; protected set; }
    public MainWindow()
    {
    ViewModel = new AppViewModel();
    InitializeComponent();
    }
    }
    ②ViewModel
    class AppViewModel:ReactiveObject
    {
    ObservableAsPropertyHelper<String> dataFromTheInternet;
    public string DataFromTheInternet
    {
    get { return dataFromTheInternet.Value; }
    }

    public ReactiveAsyncCommand GetDataFromTheInternet { get; protected set; }
    

    }
    构造方法
    public AppViewModel()
    {
    GetDataFromTheInternet = new ReactiveAsyncCommand();
    var futureData = GetDataFromTheInternet.RegisterAsyncAction(I => {
    Thread.Sleep(5 * 1000);
    return String.Format("The Future will be {0}x as awesome!", i);
    });
    dataFromTheInternet = futureData.ToProperty(this, x => x.DataFromTheInternet);
    }
    每一次用户点击按钮的时候,Command的Execute方法就会被执行一次,每5分钟就会向futureData这个Observable对象中传入一个数据。

    ReactiveUI中的缓存
    在ReactiveUI中,引入了一个称之为MemorizingMRUCache的对象,如名字所示,是一种以最近最常使用过的数据来作为缓存方案,它会移除一些在一定时间内没有请求的数据,从而保证缓存集在一定的大小范围内。
    var cache = new MemoizingMRUCache<Int32, Int32>((x, ctx) => {
    Thread.Sleep(5 * 1000);
    return x * 100;
    },20);
    cache.Get(10);//第一次获取,需要5秒
    cache.Get(10);//第二次取值,立即返回
    cache.Get(15);//也需要5秒
    MemorizingMRUCache也可以将缓存数据从内存中存储到磁盘上供以后使用,缓存的键可以是一个URL,值可以是该URL对应的临时文件。当缓存文件不再需要时,调用OnRelease方法可以删除这些临时文件。
    TryGet:视图从缓存中获取某一个键对应的值
    Invalidate:将某一个键对应的值的缓存进行清除,内部调用Release函数。
    InvalidateAll:清空所有缓存。

    图形拖拽实例:
    using System;
    using System.Reactive.Linq;
    using System.Threading;
    using System.Windows;
    using System.Windows.Controls;
    using System.Windows.Documents;
    using System.Windows.Input;

    namespace RxDragDownSample
    {
    /// <summary>
    /// MainWindow.xaml 的交互逻辑
    /// </summary>
    public partial class MainWindow : Window
    {
    public MainWindow()
    {
    InitializeComponent();
    }

        private void btnCircular_Click_1(object sender, RoutedEventArgs e)
        {
            AddShape<Shapes.Circular>();
        }
    
        private void btnSquare_Click_1(object sender, RoutedEventArgs e)
        {
            AddShape<Shapes.Square>();
        }
    
        private void btnTriangle_Click_1(object sender, RoutedEventArgs e)
        {
            AddShape<Shapes.Triangle>();
        }
    
        private void AddShape<T>() where T : new()
        {
            var shape = (new T()) as UserControl;
            myCanvas.Children.Add(shape);
            Canvas.SetLeft(shape, 10);
            Canvas.SetTop(shape, 10);
    
            var minX = 0;
            var maxX = myCanvas.ActualWidth - 30;
            var minY = 0;
            var maxY = myCanvas.ActualHeight - 30;
    
            // 鼠标在Shape上按下,开始DragDrop
            var mouseDown = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(shape, "MouseLeftButtonDown")
                            select evt.EventArgs.GetPosition(this);
            // 鼠标移动,取得坐标
            var mouseMove = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
                            select evt.EventArgs.GetPosition(this);
            // 鼠标放开,终止DragDrop
            var mouseUp = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseLeftButtonUp")
                          select evt.EventArgs.GetPosition(this);     
            // 当鼠标移出Window,终止DragDrop
            var mouseLeave = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseLeave")
                             select evt;
    

    //mouseMoves则是对连续的 mouseMove 进行了Zip 获得鼠标位移(offset)
    var mouseMoves = mouseMove.Skip(1).Zip(mouseMove, (prev, cur) =>
    new { X = prev.X - cur.X, Y = prev.Y - cur.Y });
    var dragDrop = mouseDown.SelectMany(mouseMoves.TakeUntil(mouseUp).TakeUntil(mouseLeave));

            dragDrop.ObserveOn(SynchronizationContext.Current).Subscribe(p =>
            {
                var x = Math.Min(Math.Max(Canvas.GetLeft(shape) + p.X, minX), maxX);
                var y = Math.Min(Math.Max(Canvas.GetTop(shape) + p.Y, minY), maxY);
                Canvas.SetLeft(shape, x);
                Canvas.SetTop(shape, y);
                this.lblPosition.Content = "{x:" + x.ToString() + ",y:" + y.ToString() + "}";
            });
        }
    }
    

    }

    相关文章

      网友评论

          本文标题:反应式编程 .Net平台的有Reactive Extension

          本文链接:https://www.haomeiwen.com/subject/cjhelqtx.html