美文网首页Spring Cloud java后台环境搭建
Spring Cloud Bus + kafka实现配置中心配置

Spring Cloud Bus + kafka实现配置中心配置

作者: 兔斯基_大大 | 来源:发表于2019-03-25 11:45 被阅读0次

    Spring Cloud Bus + kafka实现配置中心配置动态更新

    Spring Cloud Bus与Spring Cloud Config结合kafka实现消息总线的功能.

    架构

    5-6.png

    上面的架构图中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例就会不同于集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作

    将上面的架构做了一些调整,如下图所示:

    5-7.png
    1. 在Config Server中也引入Spring Cloud Bus,将配置服务端也加入到消息总线中来。
    2. /bus/refresh请求不在发送到具体服务实例上,而是发送给Config Server,并通过destination参数来指定需要更新配置的服务或实例。

    通过上面的改动,我们的服务实例就不需要再承担触发配置更新的职责。同时,对于Git的触发等配置都只需要针对Config Server即可,从而简化了集群上的一些维护工作。

    Kafka

    基于消息发布/订阅模式实现的消息系统,其主要设计目标如下:

    • 消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
    • 高吞吐:在廉价的商用机器上也能支持单机每秒100K条以上的吞吐量
    • 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序
    • 跨平台:支持不同技术平台的客户端(如:Java、PHP、Python等)
    • 实时性:支持实时数据处理和离线数据处理
    • 伸缩性:支持水平扩展

    基本概念:

    • Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
    • Topic:逻辑上同Rabbit的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
    • Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)。
    • Producer:消息生产者,负责生产消息并发送到Kafka Broker。
    • Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。
    • Consumer Group:每个Consumer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。

    搭建Kafka本地环境

    官网 http://kafka.apache.org/downloads.html

    目录结构

    kafka
      +-bin
        +-windows
      +-config
      +-libs
      +-logs
      +-site-docs
    

    Kafka的设计中依赖了ZooKeeper,所以我们可以在binconfig目录中除了看到Kafka相关的内容之外,还有ZooKeeper相关的内容。其中bin目录存放了Kafka和ZooKeeper的命令行工具,bin根目录下是适用于Linux/Unix的shell,而bin/windows下的则是适用于windows下的bat。我们可以根据实际的系统来设置环境变量,以方便后续的使用和操作。而在config目录中,则是用来存放了关于Kafka与ZooKeeper的配置信息。

    启动kafka

    --启动zookeeper

    bin\windows\zookeeper-server-start.bat config\zookeeper.properties

    -- 启动 kaffka

    bin\windows\kafka-server-start.bat config/server.properties

    -- 创建 Topic (可理解为创建消息队列)

    bin\windows\kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    -- 生成消息

    bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

    -- 消费消息

    bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

    config server

    pom

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-bus</artifactId>
                <version>2.0.0.RELEASE</version>
            </dependency>
    

    spring-cloud-starter-bus-kafka、spring-cloud-starter-stream-kafka、spring-cloud-bus 这个几个依赖配置是实现配置中心配置动态刷新需要的,其他参考config的pom。

    bootstrap.yml

    server:
      port: 8769
    
    spring:
      application:
        name: config-server
    
      cloud:
        config:
          server:
            git:
              uri: http://admin@192.168.1.18:10010/r/vi.git
              username: vi
              password: hz
              default-label: master
    
        bus:
          refresh:
            enabled: true
    
      kafka:
        bootstrap-servers: 192.168.1.172:9092   #配置 kafka 服务器的地址和端口
        consumer:
          group-id: SpringCloud-bus
    eureka:
      client:
        serviceUrl:
          defaultZone: http://localhost:7001/eureka/,http://localhost:7002/eureka/
        #      native:
    #            search-locations: classpath:/shared
        healthcheck:
          enabled: true                           # 开启健康检查(依赖spring-boot-starter-actuator)
      instance:
        instance-id: ${spring.cloud.client.ip-address}:${server.port}
        prefer-ip-address: true #以IP地址注册到服务中心
        lease-renewal-interval-in-seconds: 5      # 心跳时间,即服务续约间隔时间(缺省为30s)
        lease-expiration-duration-in-seconds: 15  # 发呆时间,即服务续约到期时间(缺省为90s)
    
    logging.level.org.springframework.boot.autoconfigure: ERROR
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'   #refresh
    
    

    这里比config新增的配置

        bus:
          refresh:
            enabled: true
    
      kafka:
        bootstrap-servers: 192.168.1.172:9092   #配置 kafka 服务器的地址和端口
        consumer:
          group-id: SpringCloud-bus
          
    management:
      endpoints:
        web:
          exposure:
            include: '*'   #refresh
    

    注意:

    1. 如果是properties 的话 ‘’ 不用 加 ‘ ’ 单引号*

    2. yml--include: '*' http://localhost:8769/actuator/refresh 刷新无效

    3. yml--include: '*' http://localhost:8769/actuator/bus-refresh 刷新所有微服务

    4. yml--include: 'refresh' http://localhost:8769/actuator/refresh 刷新无效

    5. yml--include: 'refresh' http://localhost:8769/actuator/bus-refresh 不能访问

    config client

    pom

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-bus</artifactId>
                <version>2.0.0.RELEASE</version>
            </dependency>
    

    spring-cloud-starter-bus-kafka、spring-cloud-starter-stream-kafka、spring-cloud-bus 这个几个依赖配置是实现配置中心配置动态刷新需要的,其他参考config的pom。

    bootstrap.yml

    spring:
      application:
      name: config-client
      cloud:
        config:
          name: application                   # 对应 {application} 部分
          profile:   dis-dev                     #指定的环境
          label: master                      #指定分支
    
          discovery:
            enabled: true
            service-id: config-server
        bus:
          refresh:
            enabled: true
    
      kafka:
        bootstrap-servers: 192.168.1.172:9092
    
    management:
      endpoints:
        web:
          exposure:
              include: '*'
    
    eureka:
      client:
        serviceUrl:
          defaultZone: http://localhost:7001/eureka/,http://localhost:7002/eureka/
    
      instance:
        instance-id: ${spring.cloud.client.ip-address}:${server.port}
        prefer-ip-address: true #以IP地址注册到服务中心
    #    ip-address:  #强制指定IP地址,默认会获取本机的IP地址
        lease-renewal-interval-in-seconds: 5      # 心跳时间,即服务续约间隔时间(缺省为30s)
        lease-expiration-duration-in-seconds: 15  # 发呆时间,即服务续约到期时间(缺省为90s)
    
    logging:
      config: classpath:log4j2-dev.xml
    
    server:
      port: 6020
    
    

    这里比config新增的配置

        bus:
          refresh:
            enabled: true
    
      kafka:
        bootstrap-servers: 192.168.1.172:9092
    
    management:
      endpoints:
        web:
          exposure:
              include: '*'
    

    注意:

    1. yml--include: '*' http://localhost:6020/actuator/refresh client1刷新,client2不刷新
    2. yml--include: '*' http://localhost:6020/actuator/bus-refresh 刷新所有微服务
    3. yml--include: 'refresh' http://localhost:6020/actuator/refresh client1刷新,client2不刷新
    4. yml--include: 'refresh' http://localhost:6020/actuator/bus-refresh 不能访问

    相关文章

      网友评论

        本文标题:Spring Cloud Bus + kafka实现配置中心配置

        本文链接:https://www.haomeiwen.com/subject/cbdvvqtx.html