美文网首页
Kafka入门与SpringBoot集成使用

Kafka入门与SpringBoot集成使用

作者: elijah777 | 来源:发表于2022-06-23 22:08 被阅读0次

    kafka

    内容大纲

    1、简单介绍认识

    2、安装与使用

    3、与springboot集成

    一、简单介绍认识

    介绍:

    1、Kafka 是一套流处理系统,可以让后端服务轻松的相互沟通,是微服务架构中常用的组件。

    异步处理、服务解耦、流量控制

    异步处理

    随着业务的不断增加,通常会在原有的服务上添加上新服务,这样会出现请求链路越来越长,链路latency也就逐步增加。例如:最开始的电商项目,可能就是简简单单的扣库存、下单。慢慢地又加上了积分服务、短信服务等。链路增长不可避免的latency增加。

    相对于扣库存、下单,积分和短信没必要恢复的那么及时。所以只需要在下单结束的时候结束那个流程,把消息传给消息队列中就可以直接返回响应了。而且短信服务和积分服务可以并行的消费这条消息。这样响应的速度更快,用户体验更好服务异步执行,系统整体latency(相对使用同步机制而言)也下降了

    服务解耦

    上面说的新加了短信服务和积分服务,现在又需要添加数据分析服务、以后可能又加一个策略服务等。可以发现订单的后续链路一直在增加,为了适配这些功能,就需要不断的修改订单服务,下游任何一个服务的接口改变都可能会影响到订单服务。

    这个时候可以采用消息队列来解耦,订单服务只需要把消息塞到消息队列里面,下游服务谁要这个消息谁就订阅响应的topic。这样订单服务就不用被拿捏住了!!

    流量治理

    后端服务相对而言是比较脆弱的,因为业务较重,处理时间长。如果碰到高QPS情况,很容易顶不住。比如说题库数据写入到ES索引中,数据都是千万级别的。这个时候使用中间件来做一层缓冲,消息队列是个很不错的选择。

    变更的消息先存放到消息队列中,后端服务尽自己最大的努力去消费队列中消费数据。

    同时,对于一些不需要及时地响应处理,且业务处理逻辑复杂、流程长,那么数据放到消息队列中,消费者按照自己的消费节奏走,也是很不错的选择。

    上述分别对应着 生产者生产过快消费者消费过慢 两种情况,使用消息队列都能很好的起到缓冲作用。

    1. Producer : 消息生产者,就是向 Kafka发送数据 ;

    2. Consumer : 消息消费者,从 Kafka broker 取消息的客户端;

    3. Consumer Group (CG): 消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

    4. Broker :经纪人 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。

    5. Topic : 话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic;

    6. Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;如果一个topic中的partition有5个,那么topic的并发度为5.

    7. Replica: 副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

    8. Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

    9. Follower: 每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。

    10. Offset: 每个Consumer 消费的信息都会有自己的序号,我们称作当前队列的offset。即

      消费点位标识消费到的位置

      每个消费组都会维护订阅的Topic 下每个队列的offset

    发布/订阅模式:

    为了解决 一条消息能被多个消费者消费的问题 ,发布/订阅模式是个很不错的选择。生产者将消息塞到消息队列对应的topic中,所有订阅了这个topic的消费者都能消费这条消息。

    kafkaRocketMQ使用的是发布订阅模式,而RabbitMQ使用的是队列模式。

    kafka文件存储方式

    由于生产者生产的消息会不断追加到 log 文件末尾, 为防止 log 文件过大导致数据定位效率低下, Kafka 采取了分片索引机制,将每个 partition 分为多个 segment。

    每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下, 该文件夹的命名规则为: topic 名称+分区序号。例如, first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。

    1. 生产者消费者

    生产者服务 Producer 向 Kafka 发送消息,消费者服务 Consumer 监听 Kafka 接收消息。

    一个服务可以同时为生产者和消费者。

    ****3. Topics 主题****

    Topic 是生产者发送消息的目标地址,是消费者的监听目标。

    一个服务可以监听、发送多个 Topics。

    Kafka 中有一个【consumer-group(消费者组)】的概念。这是一组服务,扮演一个消费者。

    如果是消费者组接收消息,Kafka 会把一条消息路由到组中的某一个服务。这样有助于消息的负载均衡,也方便扩展消费者。

    Topic 扮演一个消息的队列。

    一条消息发送,这条消息被记录和存储在这个队列中,不允许被修改。

    消息会被发送给此 Topic 的消费者,这条消息并不会被删除,会继续保留在队列中。

    消息会发送给消费者、不允许被改动、一直呆在队列中。(消息在队列中能呆多久,可以修改 Kafka 的配置)

    4. Partitions 分区

    把 Topic 看做了一个队列,实际上,一个 Topic 是由多个队列组成的,被称为【Partition(分区)】。这样可以便于 Topic 的扩展。

    生产者发送消息的时候,这条消息会被路由到此 Topic 中的某一个 Partition。

    消费者监听的是所有分区。

    生产者发送消息时,默认是面向 Topic 的,由 Topic 决定放在哪个 Partition,默认使用轮询策略。

    也可以配置 Topic,让同类型的消息都在同一个 Partition。

    例如,处理用户消息,可以让某一个用户所有消息都在一个 Partition。

    例如,用户1发送了3条消息:A、B、C,默认情况下,这3条消息是在不同的 Partition 中(如 P1、P2、P3)。

    在配置之后,可以确保用户1的所有消息都发到同一个分区中,为了提供消息的【有序性】。

    消息在不同的 Partition 是不能保证有序的,只有一个 Partition 内的消息是有序的。

    5. 架构

    Kafka 是集群架构的,ZooKeeper是重要组件。

    ZooKeeper 管理者所有的 Topic 和 Partition。

    Topic 和 Partition 存储在 Node 物理节点中,ZooKeeper负责维护这些 Node。

    Topic A 的 Partition #1 有3份,分布在各个 Node 上。

    这样可以增加 Kafka 的可靠性和系统弹性。

    3个 Partition #1 中,ZooKeeper 会指定一个 Leader,负责接收生产者发来的消息。

    这样,每个 Partition 都含有了全量消息数据。

    即使某个 Node 节点出现了故障,也不用担心消息的损坏。

    Topic A 和 Topic B 的所有 Partition 分布可能就是这样的:

    二、安装与使用

    安装:

    下载地址, 要下载带bin的, 代码被编译的

    https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz

    https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0.tar.gz

    解压之后

    1、zooleeper文件夹里面新建data目录

    2、修改conf 里面配置文件 dataDir 为data的路径

    3、修改文件名zoo_sample.cfg为zoo.cfg

    4、在zooleeper启动bin下面的zkServer.sh

    启动报错与处理

    sh ./bin/zkServer.sh start

    apache-zookeeper-3.7.0/data/zookeeper_server.pid: No such file or directory FAILED TO WRITE PID****

    更换为 ./bin/zkServer.sh start

    使用./zkServer.sh start ../conf/zoo.cfg 而不是 sh zkServer.sh start ../conf/zoo.cfg

    
    tar -zxvf /opt/apache-zookeeper-3.6.1-bin.tar.gz -C ./
    mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1
    cd zookeeper-3.6.1
    " 创建 dataDir, 在 zoo.cfg 中修改 dataDir"
    mkdir data
    cd zookeeper-3.6.1/conf
    cp zoo_sample.cfg zoo.cfg
    vim zoo.cfg
    " 修改 dataDir 为上面创建的 data 目录"
    

    Starting zookeeper ... FAILED TO START

    查看日志:

    错误: 找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain

    解决方式 在zookeeper-server下编译项目 mvn package

    还是不行, 发现自己下载的不是编译的文件

    重新下载 apache-zookeeper-3.8.0-bin.tar.gz 解压后重新启动 便可以正常使用

    (base) shenshuaihu@Elijah apache-zookeeper-3.8.0-bin % ./bin/zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /Users/shenshuaihu/SoftWare/Develop/Apache/tomcat/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    (base) shenshuaihu@Elijah apache-zookeeper-3.8.0-bin % bin/zkCli.sh
    Connecting to localhost:2181
    

    下载kafka

    官网下载

    https://www.apache.org/dyn/closer.cgi#backup

    解压, 更新配置路径 server.properties log.dirs

    启动
    
    ./bin/kafka-server-start.sh ./config/server.properties
    
    进入kafka的bin目录
    
    创建topic
    ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testInfoTopic --partitions 2 --replication-factor 1
    
    查看topic
    
    ./kafka-topics.sh --list --bootstrap-server localhost:9092
    
    生产数据
    
    ./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic
    
    消费数据
    
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic
    
    
    Untitled.png

    关闭

    关闭  [zookeeper](http://zookeeper-server-stop.sh/)
    sh zkEnv.sh
    

    三、 与springboot集成

    pom.xml

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.53</version>
            </dependency>
            <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-aop</artifactId>
            </dependency>
    

    配置yml

    spring:
      application:
        name: kafka
      kafka:
        consumer:
          bootstrap-servers:
            - 127.0.0.1:9092
          # 消费组
          group-id: myGroup
          # 消费者是否自动提交偏移量,默认为true
          enable-auto-commit: false
          # 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
          auto-offset-reset: earliest
          # 单次调用poll方法能够返回的消息数量
          max-poll-records: 50
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    生产者使用

    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    public class ProducerController {
    
        @Autowired
        private KafkaTemplate<String, String> kafka;
    
        @RequestMapping("register")
        public String register(User user) {
            String message = JSON.toJSONString(user);
            log.info("接收到用户信息:" + message);
            kafka.send("testInfoTopic", message);
            //kafka.send(String topic, @Nullable V data) {
            return "OK";
        }
       
    }
    
    
    

    消费者

    import com.alibaba.fastjson.JSON;
    import kafka.com.dto.User;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    
    @Configuration
    @Slf4j
    public class Consumer {
    
        @KafkaListener(topics = "testInfoTopic" )
        public void consume(String message) {
            System.out.println("接收到消息:" + message);
            log.info("正在为 " + message + " 办理注册业务...");
            log.info("注册成功");
        }
    
    }
    

    简单的kafka就是如此使用

    实际场景应用

    通过拦截器获取特殊场景(如删除)的方法, 获取方法的参数组装等业务日志信息,通过Kafka发送到日志系统.

    注解标记

    
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * 删除时发送日志
     */
    @Target({ElementType.METHOD, ElementType.TYPE}) //注解的使用范围,方法注解
    @Retention(RetentionPolicy.RUNTIME)  //注解的生命周期,运行期
    @Documented //在生成javac时显示该注解的信息
    public @interface DeleteLog {
        
        String actionName() default ("");
    
        boolean enabled() default true;
    
        String[] produces() default {};
    
    }
    

    对应方法添加注解方法

    @DeleteLog
    public void test(User user) {
    // ...
    }
    
    @DeleteLog
    public void delete(User user) {
        log.info("delete {}", user.getName());
    }
    

    拦截器的使用

    
    import kafka.com.annotation.DeleteLog;
    import kafka.com.config.Producer;
    import kafka.com.dto.User;
    import lombok.extern.slf4j.Slf4j;
    import org.aopalliance.intercept.MethodInterceptor;
    import org.aopalliance.intercept.MethodInvocation;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.After;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.lang.reflect.Method;
    import java.util.Date;
    import java.util.List;
    
    @Aspect
    @Order(-99)
    @Component
    @Slf4j
    public class UserLogInterceptor implements MethodInterceptor{
    
    @Autowired
        private Producer producer;
    
    @Before("@annotation(deleteLog)")
        public void beforeTest(JoinPoint point, DeleteLog deleteLog) throws Throwable {
            System.out.println("beforeTest:" + deleteLog.actionType());
            String type = deleteLog.actionType();
            StringBuilder message = new StringBuilder();
            log.info("delete: {}", deleteLog);
            message.append(type);
            message.append(new Date());
            Object[] args = point.getArgs();
            for (Object obj : args) {
                if (obj instanceof User) {
                    message.append(obj);
                 }
                if (obj instanceof String) {
                    message.append(obj);
                }
            }
            producer.producer("deleteTopic", message.toString());
        }
    
    @After("@annotation(deleteLog)")
        public void afterTest(JoinPoint point, DeleteLog deleteLog) {
            System.out.println("afterTest:" + deleteLog.actionType());
        }
    

    参考文章

    1、 Kafka -- 从基础到高级 https://blog.csdn.net/eraining/article/details/115860664

    2、 Kafka 顺序消费方案https://blog.csdn.net/qq_38245668/article/details/105900011

    3、 图解 Kafka blog.csdn.net/duysh/article/details/116355977

    2022/06/23 于成都

    相关文章

      网友评论

          本文标题:Kafka入门与SpringBoot集成使用

          本文链接:https://www.haomeiwen.com/subject/ayptdrtx.html