美文网首页
.NET Core5.0读写kafka

.NET Core5.0读写kafka

作者: 包子wxl | 来源:发表于2021-07-04 00:26 被阅读0次

kafka搭建相关:https://www.jianshu.com/p/79e29f653836

1.向kafka写消息

安装nuget包 confluent.Kafka

class Program
    {
        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            while (true)
            {
                Console.WriteLine("请输入发送的内容");
                string content = Console.ReadLine();
                //kafka节点
                string brokerList = " 192.168.101.13:9092";
                await ConfluentKafka.PublishAsync(brokerList, "test", content);
            }
        }
    }

 public static async Task PublishAsync(string brokerList,string topicName,string content)
        {
            //生产者配置
            var config = new ProducerConfig
            {
                BootstrapServers = brokerList, //kafka节点
                Acks = Acks.None //ack机制,0不等服务器确认,1主节点确认返回ack,-1全部节点同步完返回ack
            };
            using (var producer = new ProducerBuilder<string, string>(config)
            .Build())
            {
                try
                {
                    //key要给值,根据key做负载均衡,不然如果多节点,key不给值会全部写有一个分区
                    var deliveryReport = await producer.
                        ProduceAsync(
                        topicName, new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
                    Console.WriteLine($"向kafka发送了数据: {deliveryReport.TopicPartitionOffset}");
                }
                catch(ProduceException<string, string> e)
                {
                    Console.WriteLine($"向kafka发送数据失败:{e.Message}");
                }
            }
        }
image.png 查看kafka可以化工具看到产生一个test的topic,下面只有一个分片,里面已经写进了一条数据,具体内容被编码了看不了 image.png

2.读kafka消息

class Program
    {
        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            //kafka节点
            string brokerList = " 192.168.101.13:9092";
            string topicName = "test";
             ConfluentKafka.Consumer(brokerList, topicName, "G1");
            await Task.CompletedTask;
        }
    }

  public static void Consumer(string brokerList, string topicName, string groupId)
        {
            //消费配置
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                GroupId = groupId,//客户端组id
                EnableAutoCommit = true,//处理完自动提交
                //Latest = 0,表示消费者消费启动之后的数据。。。启动之前的数据消费不到
                //Earliest = 1,每次都从头开始消费没有消费过的数据
                AutoOffsetReset = AutoOffsetReset.Earliest,

                //代表数据超过了6000没有处理完业务,则把数据给其他消费端
                // 一定要注意。。SessionTimeoutMs值一定要小于MaxPollIntervalMs
                SessionTimeoutMs = 6000,
                MaxPollIntervalMs = 10000,
            };
            using (var consumer = new ConsumerBuilder<Ignore, string>(config)
               .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
               .SetPartitionsAssignedHandler((c, partitions) =>
               {
                   Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
               })
               .SetPartitionsRevokedHandler((c, partitions) =>
               {
                   //新加入消费者的时候调用 
                   Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
               })
                .Build())
            {
                //消费者会影响在平衡分区,当同一个组新加入消费者时,分区会在分配
                consumer.Subscribe(topicName);
                try
                {
                    //循环摘取模式
                    while(true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume();
                            //摘取到底了
                            if (consumeResult.IsPartitionEOF)
                            {
                                continue;
                            }
                            Console.WriteLine($": {consumeResult.TopicPartitionOffset}::{consumeResult.Message.Value}");
                            try
                            {
                                // 提交偏移量,数据已经处理完成了
                                consumer.Commit(consumeResult);
                            }
                            catch (KafkaException e)
                            {
                                Console.WriteLine($"Commit error: {e.Error.Reason}");
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Consume error: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("Closing consumer.");
                    consumer.Close();
                }
            }
        }
image.png
image.png

相关文章

网友评论

      本文标题:.NET Core5.0读写kafka

      本文链接:https://www.haomeiwen.com/subject/wqnuultx.html