美文网首页
.net core kafka封装

.net core kafka封装

作者: 黄隐后人 | 来源:发表于2020-04-14 21:33 被阅读0次

    新建一个.net core类库项目

    安装第三方依赖库,如下图所示:


    41.png

    新建一个SUPKafkaTopicConsumer类

    这是用来创建并初始化消费者,接下来看看这个类里面包含了什么。

    • 首先声明一个委托,用来接收订阅消息
    public delegate void OnReceivedHandle(object data);
    

    初始化消费者,构造函数中传入kafka地址,以及要订阅的组groupId,另外注入了log4net记录日志信息。
    init()方法用来初始化,新建一个消费者,具体代码如下。

     public class SUPKafkaTopicConsumer<TKey, TValue>
        {
            private IConsumer<TKey, TValue> consumer;
            private SUPLogger logger_;
            private string BootStrapServer;
            private string GroupId;
          
            public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null)
            {
                BootStrapServer = bootStrapServer;
                GroupId = groupId;
                logger_ = logger;
            }
    
            public bool Init()
            {
                try
                {
                    var conf = new ConsumerConfig
                    {
                        GroupId = GroupId,
                        BootstrapServers = BootStrapServer,
                        AutoOffsetReset = AutoOffsetReset.Earliest,
                        EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
                    };
                    consumer = new ConsumerBuilder<TKey, TValue>(conf)
                        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                        .Build();
    
                    return true;
                }
                catch (Exception ex)
                {
                    throw;
                }
            }
    
    • 定义回调事件,用以处理用户自定义方法。
    public event OnReceivedHandle onReceivedHandle;
    
    • 定义一个订阅的方法,传入topic,以及是否需要提交偏移量。
      其实看init()方法中我把EnableAutoCommit=false,取消了自动提交,让应用程序决定何时提交 偏移量,为什么这么做呢?
      自动提交虽然方便,但是也有一些弊端,自动提交的弊端是通过间隔时间。 一般是默认5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。
      大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,井在发生再均衡时减少 重复消息的数量。消费者 API提供了另一种提交偏移量的方式 , 开发者可以在必要的时候 提交当前偏移盘,而不是基于时间间隔。
    public void Subscribe(string topic, bool isCommit)
            {
                try
                {
                    if (consumer != null)
                    {
                        consumer.Subscribe(topic);
                        while (true)
                        {
                            var consume = consumer.Consume();
                            if (onReceivedHandle != null)
                            {
                                onReceivedHandle(consume);
    
                                if (isCommit)
                                {
                                    consumer.Commit(consume);
                                }
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    //consumer.Close();
                    throw ex;
                }
            }
    
    • 取消订阅
     public void UnSubscribe()
            {
                if (consumer != null)
                {
                    consumer.Unsubscribe();
                }
            }
    

    新建生产者类

    • 首先定义了ISUPKafkaProducer<Tkey, TValue>接口,包含四个方法
     public interface ISUPKafkaProducer<Tkey,TValue>
        {
            ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
            ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
    
            ISendResult AsyncSend(Tkey key, TValue value,string topic);
            ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition);
        }
    
    • 接口的实现,初始化过程类似消费者
    internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue>
        {
            private IProducer<Tkey, TValue> producer;
            private SUPLogger logger_;
            private string m_bootStrapServer;
    
            public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null)
            {
                m_bootStrapServer = bootStrapServer;
                logger_ = logger;
            }
            public bool Init()
            {
                try
                {
                    var config = new ProducerConfig
                    {
                        BootstrapServers = m_bootStrapServer
                    };
                    producer = new ProducerBuilder<Tkey, TValue>(config)
                        .SetErrorHandler((producer, error) =>
                        {
                            logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}",
                                m_bootStrapServer, error.Code, error.Reason));
                        })
                        .SetLogHandler((producer, msg) =>
                        {
                            logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}",
                                m_bootStrapServer, msg.Name, msg.Message));
                        })
                        .Build();
    
                    return true;
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    实现继承至ISUPKafkaProducer<Tkey, TValue>的方法

     public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
            {
                try
                {
                    if (producer != null)
                    {
                        var message = new Message<Tkey, TValue>
                        {
                            Value = value,
                            Key = key
                        };
                        producer.Produce(topic, message, sendCallBack);
                        return new SendResult(true);
                    }
                    else
                    {
                        return new SendResult(true, "没有初始化生产者");
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
            {
                try
                {
                    if (producer != null)
                    {
                        var message = new Message<Tkey, TValue>
                        {
                            Value = value,
                            Key = key
                        };
                        producer.Produce(topicPartition, message, sendCallBack);
                        return new SendResult(true);
                    }
                    else
                    {
                        return new SendResult(true, "没有初始化生产者");
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            public ISendResult AsyncSend(Tkey key, TValue value,string topic)
            {
                try
                {
                    if (producer != null)
                    {
                        var message = new Message<Tkey, TValue>
                        {
                            Value = value,
                            Key = key
                        };
                        var deliveryReport = producer.ProduceAsync(topic, message);
                        deliveryReport.ContinueWith(task =>
                       {
                           Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                       });
                        producer.Flush(TimeSpan.FromSeconds(10));
                        return new SendResult(true);
                    }
                    else
                    {
                        return new SendResult(true, "没有初始化生产者");
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition)
            {
                try
                {
                    if (producer != null)
                    {
                        var message = new Message<Tkey, TValue>
                        {
                            Value = value,
                            Key = key
                        };
    
                        var deliveryReport = producer.ProduceAsync(topicPartition, message);
                        deliveryReport.ContinueWith(task =>
                        {
                            Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                        });
    
                        producer.Flush(TimeSpan.FromSeconds(10));
                        return new SendResult(true);
                    }
                    else
                    {
                        return new SendResult(true, "没有初始化生产者");
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    新建一个SUPKafkaMessageCenter类

    这个类是对外开放的,我们利用这个类来管理生产者和消费者,看下代码非常简单。

    public static class SUPKafkaMessageCenter<Tkey, TValue>
        {
            private static SUPLogger logger = null;
            static SUPKafkaMessageCenter()
            {
                SUPLoggerManager.Configure();
                logger = new SUPLogger("KafkaCenter");
            }
            /// <summary>
            /// 创建生产者
            /// </summary>
            /// <param name="bootstrapServer"></param>
            /// <param name="topicName"></param>
            /// <returns></returns>
            public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer)
            {
                if (string.IsNullOrEmpty(bootstrapServer))
                {
                    return null;
                }
                var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger);
                if (!producer.Init())
                {
                    return null;
                }
                return producer;
            }
    
            /// <summary>
            /// 创建消费者
            /// </summary>
            /// <param name="bootstrapServer"></param>
            /// <param name="groupId"></param>
            /// <returns></returns>
            public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group")
            {
                if (string.IsNullOrEmpty(bootstrapServer))
                {
                    return null;
                }
                var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger);
                if (!consumer.Init())
                {
                    return null;
                }
                return consumer;
            }
    

    测试

    新建一个测试的控制台程序,调用代码如下

    • 消费者
    var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092");
                //绑定接收信息,回调函数
                consumer.onReceivedHandle += CallBack;
    
                var topics = new List<string>();
                topics.Add("kafka-default-topic");
                topics.Add("test");
                //订阅主题
                consumer.Subscribe(topics, false);
    
    • 生产者
    ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");
    kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});
    

    除了上面写的这些方法,其实对于kafka还有很多功能,比如topic的增删改查,我把它认为是管理类的,这里就不贴代码了,有兴趣的朋友可以进gitee上下载来看看。https://gitee.com/zhanwei103/Kafka.Net

    相关文章

      网友评论

          本文标题:.net core kafka封装

          本文链接:https://www.haomeiwen.com/subject/cvcxvhtx.html