美文网首页
Spring Cloud Bus 入门教程 补充

Spring Cloud Bus 入门教程 补充

作者: Wind哥 | 来源:发表于2020-03-07 14:18 被阅读0次

    说是入门教程,其实不是,真正的教程在此 https://www.jianshu.com/p/bab2da26f282,不知道作者使用什么版本,里面的例子最新版【Hoxton.SR1】走不通,本文只是例子存在的问题进行补充,同时,向该文作者致敬!

    原文例子中Event的例子主要分为两部分
    1.本应用发布,本应用内监听,这个是基于Spring的消息机制的,例子能走通
    2.本应用发布,其他应用监听,例子走不通,原文下面的留言可证

    鉴于大家反馈是本地发Event后本地能监听,其他应用不能监听,而本地和其他应用通过kafka监听,并且我们知道topic名字,检查日志目录得知文件夹springCloudBus-0是存在的(topic存在,也可以通过指令去确认),说明应用已经链接kafka,并且自动创建好topic了
    于是直接通过命令行订阅

    kafka-console-consumer.sh --topic springCloudBus --from-beginning --bootstrap-server localhost:9092
    

    发现里面一条数据都没有,通过生成者指令进行输入,队列是通的

    kafka-console-producer.bat --broker-list localhost:9092 --topic springCloudBus
    

    那么kafka没问题,那就确认是源应用没成功发布出去了
    提示:这里手工输入的消息,是不符合bus的格式要求的,会导致订阅的应用报错,需要清理掉,我本机做法是直接关掉kafka服务后删掉它的日志文件了事


    从源应用入手

    首先要知道的是:RemoteApplicationEvent(下称RemoteEvent) 是继承 ApplicationEvent(或者说相对于remote,可以理解成是local event,其实是spring内部的消息机制,为了清晰区分,下称LocalEvent)

    RemoteEvent 的发布过程如下
    1.创建 并且 发布 LocalEvent

    //原文中的代码
    protected void fireEvent(String eventAction, ProductDto productDto) {
        ProductEvent productEvent = new ProductEvent(productDto,
                ApplicationContextHolder.getApplicationContext().getId(), "*:**",
                eventAction, productDto.getItemCode());
        // 发布事件
        RemoteApplicationEventPublisher.publishEvent(productEvent);
    }
    

    这里其实是发布了一个ApplicationEvent(LocalEvent),在String的消息机制里流转

    2.Bus监听了LocalEvent(左右继承自RemoteApplicationEvent的Event,目前还是在Local里流转)
    3.判断是不是自己(本应用)发出的(isFromSelf)
    4.如果是自己(本应用)发出的,发送到outboundChannel

    //step 2,3,4的代码如下
    @EventListener(classes = RemoteApplicationEvent.class)
    public void acceptLocal(RemoteApplicationEvent event) {
      if (this.serviceMatcher.isFromSelf(event)
          && !(event instanceof AckRemoteApplicationEvent)) {   
          this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
      }
    }
    

    原文中的问题点就出在这里了,判断isFromSelf时候可能是因为版本的原因匹配失败,导致跳过了发送代码

    继续挖代码

    public boolean isFromSelf(RemoteApplicationEvent event) {
      String originService = event.getOriginService();
      String serviceId = getServiceId();
      return this.matcher.match(originService, serviceId);
    }
    

    isFromSelf的判断其实是判断Event中的originService 和Bus自己的id是否一致,其中originService 原文使用的是applicationContext.id,而从ServiceMatcher 的初始化代码得知,serviceId是通过BusProperties获取的,而我本地调试时候两个获取到的id格式不一样;

    //ServiceMatcher 初始化
    @Bean
    public ServiceMatcher serviceMatcher(@BusPathMatcher PathMatcher pathMatcher,
        BusProperties properties, Environment environment) {
      String[] configNames = environment.getProperty(CLOUD_CONFIG_NAME_PROPERTY,
          String[].class, new String[] {});
      ServiceMatcher serviceMatcher = new ServiceMatcher(pathMatcher,
          properties.getId(), configNames);
      return serviceMatcher;
    } 
    

    既然知道问题那就简单了,直接在fire事件时候用BusProperties.getId 代替原来的applicationContext.getId即可
    调整后的代码如下

    //调整后的代码
    @autowired
    BusProperties busProperties;//该注入方式是不推荐的,这里仅仅为了表达意思
    protected void fireEvent(String eventAction, ProductDto productDto) {
        ProductEvent productEvent = new ProductEvent(productDto,
                busProperties.getId(), "*:**",
                eventAction, productDto.getItemCode());
        // 发布事件
        RemoteApplicationEventPublisher.publishEvent(productEvent);
    }
    

    现在是不是本文开始时候用命令行订阅的消费者,也读到数据了?

    当然,还有另外一种做法是把bus的id设置成和spring的一致

    #配置文件(具体值我没测试,要自己试试,我用的是上面那种方法)
    spring.cloud.bus.id=${spring.application.name}-${server.port}
    

    顺便提一下远端应用接受的过程
    1.BusAutoConfiguration.acceptRemote,注意,这里是StreamListener,监听的是kafka的流
    2.判断是不是发给自己的serviceMatcher.isForSelf
    3.若上述条件符合,判断是不是自己发的serviceMatcher.isFromSelf,若是,跳过
    从发送流程得知,如果自己发的,本来就在LocalEvent里流转了,若自己订阅也已经消费过一次了;而且如果这里重新入Local的话,会导致发布流程会重新触发,死循环了!
    4.若上述条件符合,发布LocalEvent,丢进Spring的消息机制流转,监听了该Event的地方自然能收到信息

    整体代码(简略版)如下

    //(接受到)RemoteEvent 转 LocalEvent(进行消费)过程
    @StreamListener(SpringCloudBusClient.INPUT)
    public void acceptRemote(RemoteApplicationEvent event) {
        ......
        if (this.serviceMatcher.isForSelf(event)
                && this.applicationEventPublisher != null) {
            if (!this.serviceMatcher.isFromSelf(event)) {
                this.applicationEventPublisher.publishEvent(event);
            }
            ......
        }
        ......
    }
    

    另外,RemoteEvent需要序列化处理的(Json),而对于ApplicationEvent(LocalEvent)来说,source代表事件的触发者(而非数据),所以序列化时候是跳过的了

    //声明了序列化时候跳过source
    @JsonIgnoreProperties("source")
    public abstract class RemoteApplicationEvent 
    

    所以原文中直接用soucre传递数据是不妥的(估计以前可以,新版意识到source使用错误,修正了)
    同时,因为涉及到json反序列化,默认构造函数是必须的!别和我一样犯傻!

    private static final Object TRANSIENT_SOURCE = new Object();
    ...
    protected RemoteApplicationEvent() {
        // for serialization libs like jackson
        this(TRANSIENT_SOURCE, null, null);
    }
    

    人家代码里已经说清楚了,默认构造函数是用来反序列化的,并且TRANSIENT_SOURCE也充分说明了不能用soucre传递数据:)

    至此 整个流程接受,祝顺利调通

    相关文章

      网友评论

          本文标题:Spring Cloud Bus 入门教程 补充

          本文链接:https://www.haomeiwen.com/subject/fnoqdhtx.html