美文网首页rpc 中间件 分布式
springCloud --- 中级篇(3)

springCloud --- 中级篇(3)

作者: 贪挽懒月 | 来源:发表于2020-06-07 17:19 被阅读0次

    本系列笔记涉及到的代码在GitHub上,地址:https://github.com/zsllsz/cloud

    本文涉及知识点:

    • 服务消息驱动之stream;

    • 服务链路追踪之sleuth;


    欢迎大家关注我的公众号,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。


    java开发那些事

    一、服务消息驱动之springcloud stream

    1、是什么?
    现在主流的消息中间件有以下四种:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka

    比如京东这个网站,可能用的是RabbitMQ,但是京东的大数据分析用的是Kafka,存在两种MQ,切换、维护和开发都不太方便。消息驱动就是可以让我们不再关注MQ的实现细节,我们只需要用一种适配绑定的方式,自动地给我们在各种MQ之间切换。简言之,就像JDBC连接数据库,我们不需要关心它怎么连接各个数据库厂商,只需要按照JDBC的编程模型去写代码就好;springcloud stream也是,相当于封装了常用的各种MQ(目前stram封装了RabbitMQ和kafka),我们只需要调它的binder对象,通过它去操作MQ就行了。

    2、执行流程?

    • 消息生产者:生产消息(source) ---> 通道(channel) ---> 绑定器(binder)
    • 消息消费者:绑定器(binder) ---> 通道(channel) ---> 消费消息(sink)

    3、编码实现:
    (1)、消息生产者:

    • 新建一名名为cloud-stream-rabbitmq-provider8801的module,作为生产者
    • pom.xml:
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator </artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <!-- eureka client -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <!-- stream-rabbitmq -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
    • yml:
    server:
      port: 8801
      
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: # 在此处配置需要绑定的rabbitMQ的信息
            defaultRabbit: # 消息服务设置的名称,用于bindings整合
              type: rabbit # 消息组件类型
              environment: # rabbitmq环境配置
                spring: 
                  rabbitmq:
                    host: 192.168.0.106
                    port: 5672
                    username: admin
                    password: admin
          bindings: # 服务的整合处理
            output: # 通道名
              destination: test-stream # rabbitmq的exchange的名称
              content-type: application/json
              binder: defaultRabbit # 消息服务设置的名称
    eureka:
      client:
        service-url:
          defaultZone: http://eureka7001.com:7001/eureka 
    
    • 主启动类:
    @SpringBootApplication
    @EnableEurekaClient
    public class StreamMain8801 {
        public static void main(String[] args) throws Exception {
            SpringApplication.run(StreamMain8801.class, args);
        }
    }
    
    • service:
    public interface IMessageProvider {
        public String send();
    }
    
    • serviceImpl:
    @EnableBinding(Source.class)
    public class MessageProviderImpl implements IMessageProvider{
        @Autowired
        private MessageChannel output;
    
        @Override
        public void send() {
            String msg = "hello world";
            output.send(MessageBuilder.withPayload(msg).build());
            System.out.println("============== 发送成功 ==============");
        }
    }
    
    • controller:
    @RestController
    @RequestMapping("/stream")
    public class SendMessageController {
        @Autowired
        private IMessageProvider messageProvider;
        
        @GetMapping("/message")
        public String send() {
            messageProvider.send();
            return "success";
        }
    }
    

    启动rabbitmq,再启动7001,然后启动8801,最后访问一下localhost:8801/stream/message,可以返回success,消息发送成功。

    (2)、消息消费者:

    • 新建名为cloud-stream-rabbitmq-consumer8802,作为消费者
    • pom.xml:和8801的一样
    • yml:
    server:
      port: 8802
     
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置需要绑定的rabbitMQ的信息
            defaultRabbit: # 消息服务设置的名称,用于bindings整合
              type: rabbit # 消息组件类型
              environment: # rabbitmq环境配置
                spring: 
                  rabbitmq:
                    host: 192.168.0.106
                    port: 5672
                    username: admin
                    password: admin
          bindings: # 服务的整合处理
            input: # 通道名
              destination: test-stream # rabbitmq的exchange的名称
              content-type: application/json
              binder: defaultRabbit # 消息服务设置的名称
    eureka:
      client:
        service-url:
          defaultZone: http://eureka7001.com:7001/eureka
    
    • 主启动:和8801一样
    • 业务类:
    @EnableBinding(Sink.class)
    public class ConsumerController {
        @Value("${server.port}")
        private String serverPort;
        
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message) {
            System.out.println("======== 端口:" + serverPort + ",消息内容:" + message.getPayload());
        }
    }
    

    这样就搞定了,启动7001,8801,8802,然后用8801往mq发消息,在8802的控制台就可以看到打印出收到的消息了。

    4、重复消费的问题:
    首先依照8802再建一个消费者8803。然后8803也启动起来,再通过8801发送两条消息,可以看到8802和8803控制台都打印出了消息,也就是重复消费了。

    8802 8803

    重复消费肯定是不行的。出现这种情况的原因是:rabbitmq默认消费者处于不同的group,在不同group中的消费者都可以消费消息。解决办法就是:将这两个消费者设置为同一group,同一group的消费者是竞争关系,能够保证消息只被其中一个消费者消费。

    • 在8802和8803的yml中都加上一行配置:
    bindings: # 服务的整合处理
            input: # 通道名
              destination: test-stream # rabbitmq的exchange的名称
              content-type: application/json
              binder: defaultRabbit # 消息服务设置的名称
              group: A
    

    即最后一行:group: A,将两个消费者组名都设置为A。这样就解决了重复消费问题,并且两个消费者轮询消费。假如8801发送了两条消息,那么8802和8803分别会消费一条消息。

    5、持久化:
    现在关闭8802和8803,然后用8801发4条消息。把8802的group分组去掉,8803的保留。最后启动8802和8803,会发现,8802没有收到任何消息,而8803消费了4条消息。也就是说,加上了group配置,就做了持久化,即使消费者宕机了,重启后还是可以消费到。

    二、服务链路追踪之springcloud sleuth

    各个微服务之间相互调用,形成了复杂的调用链路。sleuth就是来监控追踪这调用链路的,搭配zipkin使用。sleuth负责收集调用信息,zipkin负责展现。

    1、zipkin的安装:

    • 下载jar包:dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server
    • 运行下载的jar包:
    普通运行
    java -jar zipkin-server-2.12.9-exec.jar
    后台运行
    nohup java -jar zipkin-server-2.12.9-exec.jar &
    
    • 控制台:ip:9411/zipkin

      zipkin
    • 修改8001和80,加上zipkin的依赖和配置:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-zipkin</artifactId>
    </dependency>
    
    spring:
      application:
        name: cloud-payment-service #提交到注册中心的微服务名称
        zipkin:
          base-url: http://192.168.0.106:9411
        sleuth:
          sampler:
            probability: 1 # 采样率值介于0和1之间,1表示全部采集
    

    中级篇完结

    相关文章

      网友评论

        本文标题:springCloud --- 中级篇(3)

        本文链接:https://www.haomeiwen.com/subject/qrujnhtx.html