美文网首页
Kafka在ASP.Net Core上的应用

Kafka在ASP.Net Core上的应用

作者: WxhShine | 来源:发表于2020-02-26 01:04 被阅读0次
    image

    ASP.Net Core中使用的最多的是Confluent.Kafka这个包,以下用实例展示应用

    1.下载Nuget包

    ​ 首先是下载Confluent.Kafka这个包

    image

    2.创建Producer消息生产者

    发送者

        public class KafkaProducer
        {
            public static async Task SendAsync<T>(string topic, T value) where T: KafkaMessage
            {
                var config = new ProducerConfig { BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers };//服务器IP
                ProducerBuilder<Null, string> producerBuilder = new ProducerBuilder<Null, string>(config);
                using (var p = producerBuilder.Build())
                {
                    try
                    {
                        var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = JsonConvert.SerializeObject(value) });
                        Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                    }
                    catch (ProduceException<Null, string> e)
                    {
                        Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                    }
                }
            }
        }
    

    其中要注意的一点ProducerBuilder<TKey,TValue>中的TValue类型只能是Confluent.Kafka.Null, int, long, string, float, double, byte[]. 这7种类型, 否则在调用producerBuilder.Build()时会抛出 ArgumentNullException(Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name})

    消息体中包含你的消息必须的内容

        public class KafkaMessage
        {
        }
    

    3.创建Consumer消息消费者

    总消费类

    public class KafkaConsumer<T> where   T : KafkaMessage
        {
            public string Topic { get; set; }
            public string ConsumerGroup { get; set; }
    
            public void Subscribe(Action<T> dealMessage)
            {
                var config = new ConsumerConfig
                {
                    GroupId = ConsumerGroup,
                    BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers,
                    AutoOffsetReset = AutoOffsetReset.Latest
                };
                Task.Run(() =>
               {
                   var builder = new ConsumerBuilder<string, string>(config);
                   using (var consumer = builder.Build())
                   {
                       consumer.Subscribe(Topic);
                       while (true)
                       {
                           var result = consumer.Consume();
                           try
                           {
                               var message = JsonConvert.DeserializeObject<T>(result.Value);
                               dealMessage(message);
                           }
                           catch (Exception)
                           {
                               Console.WriteLine($"Topic : {result.Topic}, Message : {result.Value}");
                           }
                       }
                   }
               });
            }
        }
    

    子消费类

    interface ITestKafkaConsumer
    {
        void DealMessage(TestKafkaEntity message);
        void Subscribe();
    } 
    
    public class TestKafkaConsumer :  ITestKafkaConsumer
        {
    
            private KafkaConsumer<TestKafkaEntity> consumer { get; set; }
    
            public TestKafkaConsumer()
            {
                consumer = new KafkaConsumer<TestKafkaEntity>
                {
                    Topic = "test",
                    ConsumerGroup = "console-consumer-63873",
                };
    
            }
            public void DealMessage(TestKafkaEntity message)
            {
                Console.WriteLine("-------------------------------------------------------------");
                Console.WriteLine("这是一个消费者!!!" + message.ConsumerValue);
                Console.WriteLine("-------------------------------------------------------------");
            }
    
            public void Subscribe()
            {
                consumer.Subscribe(DealMessage);
            }
        }
    

    通过回调方法的方式, 将子消息类中的方法传入总消息类中

    4.注入消费者

    在Startup.cs类中的ConfigureServices方法中注入子消费类:

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<ITestKafkaConsumer, TestKafkaConsumer>();
    }
    

    然后在Program.cs类中的Main方法启动消费者:

    public static void Main(string[] args)
    {
        var hostBuilder = CreateHostBuilder(args);
        var host = hostBuilder.Build();
        using (var scope = host.Services.CreateScope())
        {
            var testConsumer = scope.ServiceProvider.GetService<ITestKafkaConsumer>();
            testConsumer.Subscribe();
        }
        host.Run(); ;
    }
    

    结果展示:

    image

    以上就是kafka在ASP.Net Core中的简单实现
    源码地址: https://github.com/WxhShine/RestfulAPILearn/tree/master/ASPCoreRestfulApiDemo/Kafka

    相关文章

      网友评论

          本文标题:Kafka在ASP.Net Core上的应用

          本文链接:https://www.haomeiwen.com/subject/ldgdchtx.html