说是入门教程,其实不是,真正的教程在此 https://www.jianshu.com/p/bab2da26f282,不知道作者使用什么版本,里面的例子最新版【Hoxton.SR1】走不通,本文只是例子存在的问题进行补充,同时,向该文作者致敬!
原文例子中Event的例子主要分为两部分
1.本应用发布,本应用内监听,这个是基于Spring的消息机制的,例子能走通
2.本应用发布,其他应用监听,例子走不通,原文下面的留言可证
鉴于大家反馈是本地发Event后本地能监听,其他应用不能监听,而本地和其他应用通过kafka监听,并且我们知道topic名字,检查日志目录得知文件夹springCloudBus-0是存在的(topic存在,也可以通过指令去确认),说明应用已经链接kafka,并且自动创建好topic了
于是直接通过命令行订阅
kafka-console-consumer.sh --topic springCloudBus --from-beginning --bootstrap-server localhost:9092
发现里面一条数据都没有,通过生成者指令进行输入,队列是通的
kafka-console-producer.bat --broker-list localhost:9092 --topic springCloudBus
那么kafka没问题,那就确认是源应用没成功发布出去了;
提示:这里手工输入的消息,是不符合bus的格式要求的,会导致订阅的应用报错,需要清理掉,我本机做法是直接关掉kafka服务后删掉它的日志文件了事
从源应用入手
首先要知道的是:RemoteApplicationEvent(下称RemoteEvent) 是继承 ApplicationEvent(或者说相对于remote,可以理解成是local event,其实是spring内部的消息机制,为了清晰区分,下称LocalEvent)
RemoteEvent 的发布过程如下
1.创建 并且 发布 LocalEvent
//原文中的代码
protected void fireEvent(String eventAction, ProductDto productDto) {
ProductEvent productEvent = new ProductEvent(productDto,
ApplicationContextHolder.getApplicationContext().getId(), "*:**",
eventAction, productDto.getItemCode());
// 发布事件
RemoteApplicationEventPublisher.publishEvent(productEvent);
}
这里其实是发布了一个ApplicationEvent(LocalEvent),在String的消息机制里流转
2.Bus监听了LocalEvent(左右继承自RemoteApplicationEvent的Event,目前还是在Local里流转)
3.判断是不是自己(本应用)发出的(isFromSelf)
4.如果是自己(本应用)发出的,发送到outboundChannel
//step 2,3,4的代码如下
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicationEvent)) {
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
}
}
原文中的问题点就出在这里了,判断isFromSelf时候可能是因为版本的原因匹配失败,导致跳过了发送代码
继续挖代码
public boolean isFromSelf(RemoteApplicationEvent event) {
String originService = event.getOriginService();
String serviceId = getServiceId();
return this.matcher.match(originService, serviceId);
}
isFromSelf的判断其实是判断Event中的originService 和Bus自己的id是否一致,其中originService 原文使用的是applicationContext.id,而从ServiceMatcher 的初始化代码得知,serviceId是通过BusProperties获取的,而我本地调试时候两个获取到的id格式不一样;
//ServiceMatcher 初始化
@Bean
public ServiceMatcher serviceMatcher(@BusPathMatcher PathMatcher pathMatcher,
BusProperties properties, Environment environment) {
String[] configNames = environment.getProperty(CLOUD_CONFIG_NAME_PROPERTY,
String[].class, new String[] {});
ServiceMatcher serviceMatcher = new ServiceMatcher(pathMatcher,
properties.getId(), configNames);
return serviceMatcher;
}
既然知道问题那就简单了,直接在fire事件时候用BusProperties.getId 代替原来的applicationContext.getId即可
调整后的代码如下
//调整后的代码
@autowired
BusProperties busProperties;//该注入方式是不推荐的,这里仅仅为了表达意思
protected void fireEvent(String eventAction, ProductDto productDto) {
ProductEvent productEvent = new ProductEvent(productDto,
busProperties.getId(), "*:**",
eventAction, productDto.getItemCode());
// 发布事件
RemoteApplicationEventPublisher.publishEvent(productEvent);
}
现在是不是本文开始时候用命令行订阅的消费者,也读到数据了?
当然,还有另外一种做法是把bus的id设置成和spring的一致
#配置文件(具体值我没测试,要自己试试,我用的是上面那种方法)
spring.cloud.bus.id=${spring.application.name}-${server.port}
顺便提一下远端应用接受的过程
1.BusAutoConfiguration.acceptRemote,注意,这里是StreamListener,监听的是kafka的流
2.判断是不是发给自己的serviceMatcher.isForSelf
3.若上述条件符合,判断是不是自己发的serviceMatcher.isFromSelf,若是,跳过
从发送流程得知,如果自己发的,本来就在LocalEvent里流转了,若自己订阅也已经消费过一次了;而且如果这里重新入Local的话,会导致发布流程会重新触发,死循环了!
4.若上述条件符合,发布LocalEvent,丢进Spring的消息机制流转,监听了该Event的地方自然能收到信息
整体代码(简略版)如下
//(接受到)RemoteEvent 转 LocalEvent(进行消费)过程
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
......
if (this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher != null) {
if (!this.serviceMatcher.isFromSelf(event)) {
this.applicationEventPublisher.publishEvent(event);
}
......
}
......
}
另外,RemoteEvent需要序列化处理的(Json),而对于ApplicationEvent(LocalEvent)来说,source代表事件的触发者(而非数据),所以序列化时候是跳过的了
//声明了序列化时候跳过source
@JsonIgnoreProperties("source")
public abstract class RemoteApplicationEvent
所以原文中直接用soucre传递数据是不妥的(估计以前可以,新版意识到source使用错误,修正了)
同时,因为涉及到json反序列化,默认构造函数是必须的!别和我一样犯傻!
private static final Object TRANSIENT_SOURCE = new Object();
...
protected RemoteApplicationEvent() {
// for serialization libs like jackson
this(TRANSIENT_SOURCE, null, null);
}
人家代码里已经说清楚了,默认构造函数是用来反序列化的,并且TRANSIENT_SOURCE也充分说明了不能用soucre传递数据:)
至此 整个流程接受,祝顺利调通
网友评论