美文网首页Orleans码农的世界编程语言爱好者
Orleans解决并发之痛(四):Streams

Orleans解决并发之痛(四):Streams

作者: BeckJin | 来源:发表于2017-07-30 23:23 被阅读850次

    Orleans 提供了 Stream扩展编程模型。此模型提供了一套API,使处理流更简单和更健壮。Stream默认提供了两种Provider,不同的流类型可能使用不同的Provider来处理,Simple Message Stream ProviderAzure Queue Stream Provider。Stream Providers兼容现有的队列技术,比如: Event HubsServiceBusAzure QueuesApache Kafka,不再需要编写额外的代码来配合这些队列技术的使用。

    关于为什么Orleans会提供Stream扩展编程模型?

    当今已经有一系列技术可以来构建一个流处理系统。包括持久存储流数据方面,如:Event HubsKafka;数据流计算操作方面,如: Azure Stream AnalyticsApache StormApache Spark Streaming, 而这些技术并不适合细粒度的自由格式的流数据计算, 或者支持的并不好,因为实际情况下可能需要对不同的数据流执行不同的操作,Orleans Streams目的就是解决这类问题,Stream编程模型和发布订阅模式挺相似。

    上述提到的一些技术我并没有详细学习,后面会了解并对比。

    Orleans Stream大概实现的步骤如下:

    1. 获取 StreamProvider
    2. 获取 IAsyncStream<T>
    3. 订阅者订阅一个Stream
    4. 发布者向某个Stream发布消息

    Silo配置文件OrleansConfiguration.xml修改

    在Globals节点中添加:

    <StorageProviders>
        <Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" />
    </StorageProviders>
    <StreamProviders>
        <Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/>
    </StreamProviders>
    

    Name为PubSubStore的StorageProvider是必须的,Stream内部需要它来跟踪所有流订阅,记录各个流的发布者和订阅者的关系,本例中使用MemoryStorage,实际生产环境这是不对的。

    Name为SMSProvider的StreamProvider指定了消息的发布形式,Orleans当前提供的两种StreamProvider:Simple Message Stream ProviderAzure Queue Stream Provider 都是可靠的。

    Simple Message Stream Provider:不保证可靠的交付,失败的消息不会自动重新发送,但可以根据返回的Task状态来判断是否重新发送,事件执行顺序遵循FIFO原则。

    Azure Queue Stream Provider:事件被加入Azure Queue, 如果传送或处理失败,事件不会从队列中删除,并且稍后会自动重新被发送,因此事件执行顺序不遵循FIFO原则。

    获取 StreamProvider

    var streamProvider = this.GetStreamProvider("SMSProvider");
    

    SMSProvider 对应配置文件中Name为SMSProvider的StreamProvider

    获取 IAsyncStream<T>

    var streamId = this.GetPrimaryKey();
    var stream = streamProvider.GetStream<string>(streamId, "GrainStream");
    

    GetStream 需要两个参数,通过两个值定位唯一的Stream:
    streamId:Guid类型,stream标识
    streamNamespace:字符串,stream的命名空间

    订阅一个Stream

    订阅Stream分为隐式和显式订阅。

    隐式订阅

    隐式订阅的订阅者是唯一的,不存在对一个Stream的多次订阅,也不能取消订阅。

    Interface:

    public interface IImplicitSubscriberGrain : IGrainWithGuidKey
    {
    }
    

    Grain:

    [ImplicitStreamSubscription("GrainImplicitStream")]
    public class ImplicitSubscriberGrain : Grain, IImplicitSubscriberGrain, IAsyncObserver<string>
    {
        protected StreamSubscriptionHandle<string> streamHandle;
    
        public override async Task OnActivateAsync()
        {
            var streamId = this.GetPrimaryKey();
            var streamProvider = this.GetStreamProvider("SMSProvider");
            var stream = streamProvider.GetStream<string>(streamId, "GrainImplicitStream");
            streamHandle = await stream.SubscribeAsync(OnNextAsync);
        }
    
        public override async Task OnDeactivateAsync()
        {
            if (streamHandle != null)
                await streamHandle.UnsubscribeAsync();
        }
    
        public Task OnCompletedAsync()
        {
            return Task.CompletedTask;
        }
    
        public Task OnErrorAsync(Exception ex)
        {
            return Task.CompletedTask;
        }
    
        public Task OnNextAsync(string item, StreamSequenceToken token = null)
        {
            Console.WriteLine($"Received message:{item}");
            return Task.CompletedTask;
        }
    }
    
    1. 在Grain上标记 ImplicitStreamSubscription 属性,变量值为命名空间;
    2. 在Grain的OnActivateAsync方法体中调用SubscribeAsync;
    3. 实现IAsyncObserver接口,当发布者向Stream发送消息,订阅者接到消息后将执行OnNextAsync;
    4. 隐式订阅模式订阅者自动由发布者创建;

    显式订阅

    Interface:

    public interface IExplicitSubscriberGrain : IGrainWithGuidKey
    {
        Task<StreamSubscriptionHandle<string>> SubscribeAsync();
    
        Task ReceivedMessageAsync(string data);
    }
    

    Grain:

    public class ExplicitSubscriberGrain : Grain, IExplicitSubscriberGrain
    {
        private IAsyncStream<string> stream;
    
        public async override Task OnActivateAsync()
        {
            var streamProvider = this.GetStreamProvider("SMSProvider");
            stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "GrainExplicitStream");
            var subscriptionHandles = await stream.GetAllSubscriptionHandles();
            if (subscriptionHandles.Count > 0)
            {
                subscriptionHandles.ToList().ForEach(async x =>
                {
                    await x.ResumeAsync((payload, token) => this.ReceivedMessageAsync(payload));
                });
            }
        }
    
        public async Task<StreamSubscriptionHandle<string>> SubscribeAsync()
        {
            return await stream.SubscribeAsync((payload, token) => this.ReceivedMessageAsync(payload));
        }
    
        public Task ReceivedMessageAsync(string data)
        {
            Console.WriteLine($"Received message:{data}");
            return Task.CompletedTask;
        }
    }
    
    1. 订阅者通过调用SubscribeAsync方法完成订阅,并返回StreamSubscriptionHandle,这个对象提供了UnsubscribeAsync方法,方便取消订阅;

    2. 本例子中支持对同一个Stream被订阅多次,被订阅多次的结果是当向这个Stream发送消息的时候,ReceivedMessageAsync会执行多次。如果不希望对同一个Stream定义多次,在SubscribeAsync方法中可以通过GetAllSubscriptionHandles获取当前订阅者的个数,只有为0才执行订阅;

    3. 订阅者是一直存在的,除了被显示调用了UnsubscribeAsync方法。在OnActivateAsync中我们加入了ResumeAsync操作, 当Grain由未激活状态变为激活状态的时候,通过GetAllSubscriptionHandles获取这个Stream中存在的订阅者,通过ResumeAsync可以把它们重新唤醒。(模拟方式:杀掉Silo,重新启动即可,不过前提条件是PubSubStore不能使用MemoryStorage,因为使用MemoryStorage存储一旦重启后订阅者和发布者的关系都会丢失)

    发布消息

    Interface:

    public interface IPublisherGrain: IGrainWithGuidKey
    {
        Task PublishMessageAsync(string data);
    }
    

    Grain:

    public class PublisherGrain : Grain, IPublisherGrain
    {
        private IAsyncStream<string> stream;
    
        public override Task OnActivateAsync()
        {
            var streamId = this.GetPrimaryKey();
            var streamProvider = this.GetStreamProvider("SMSProvider");
            this.stream = streamProvider.GetStream<string>(streamId, "GrainExplicitStream"); //隐式:GrainImplicitStream
            return base.OnActivateAsync();
        }
    
        public async Task PublishMessageAsync(string data)
        {
            Console.WriteLine($"Sending data: {data}");
            await this.stream.OnNextAsync(data);
        }
    }
    

    通过调用IAsyncStream的OnNextAsync发布消息即可。这里可以针对返回的Task状态再作一些操作,如果不成功,重新发送或记录日志等。

    Client发布消息:

    客户端发布消息:

    while (true)
    {
        Console.WriteLine("Press 'exit' to exit...");
        var input = Console.ReadLine();
        if (input == "exit") break;
        var publisherGrain = GrainClient.GrainFactory.GetGrain<IPublisherGrain>(Guid.Empty);
        publisherGrain.PublishMessageAsync(input);
    }
    
    发布消息

    显示订阅下,需要增加另一个客户端先完成订阅:

    var subscriberGrain = GrainClient.GrainFactory.GetGrain<IExplicitSubscriberGrain>(Guid.Empty);
    var streamHandle = subscriberGrain.SubscribeAsync().Result;
    Console.WriteLine("Press enter to exit...");
    Console.ReadLine();
    streamHandle.UnsubscribeAsync();
    
    显示订阅下发布消息

    参考链接:

    相关文章

      网友评论

        本文标题:Orleans解决并发之痛(四):Streams

        本文链接:https://www.haomeiwen.com/subject/ilbtlxtx.html