之前介绍了如何简单的搭建Spring Cloud Bus 组件,今天让我们来自定义一个消息总线的传播。本文实现是参考小马哥 SF 上的 Spring Cloud 课程,如果有同学感兴趣,可以去SF搜索一下小马哥,知名Java劝退师。
前言
首先我们要知道的是 Spring Cloud Bus 默认的三个内部事件
RefreshRemoteApplicationEvent
AckRemoteApplicationEvent
EnvironmentChangeRemoteApplicationEvent
因此我们现在只需要仿照 其中一个事件来实现我们自定义事件就好。我们可以先来看一个事件的源码实现。
RefreshRemoteApplicationEvent
事件定义:
/**
* @author Spencer Gibb
*/
@SuppressWarnings("serial")
public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {
@SuppressWarnings("unused")
private RefreshRemoteApplicationEvent() {
// for serializers
}
public RefreshRemoteApplicationEvent(Object source, String originService,
String destinationService) {
super(source, originService, destinationService);
}
}
RefreshRemoteApplicationEvent
事件发布
/**
* @author Spencer Gibb
*/
@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
super(context, id);
}
@WriteOperation
public void busRefreshWithDestination(@Selector String destination) { // TODO:
// document
// destination
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
}
@WriteOperation
public void busRefresh() {
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
}
}
我们可以看到 busRefresh
方法中发布了事件。通过以上的事件定义以及事件发布,我们知道需要三个参数 一个源数据(source
),一个发送源ID(originService
),一个目标地址(destinationService
)。
RefreshRemoteApplicationEvent
事件监听
/**
* @author Spencer Gibb
*/
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent> {
private static Log log = LogFactory.getLog(RefreshListener.class);
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher) {
this.contextRefresher = contextRefresher;
}
@Override
public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
Set<String> keys = this.contextRefresher.refresh();
log.info("Received remote refresh request. Keys refreshed " + keys);
}
}
我们可以看到,在RefreshRemoteApplicationEvent
事件监听中,Spring Cloud Bus在监听到事件后,就只是调用了 this.contextRefresher.refresh();
并没有对事件里的内容做具体操作。
接下来我们就仿照以上的代码自己自定义一个通过消息总线事件发布以及监听的事件吧。
自定义 Spring Cloud Bus 事件
所需组件
我们需要上一节 Spring Cloud系列--Spring Cloud Bus(一) 所修改后的组件(注意必须实现Spring Cloud Bus 才可以往下进行)。
- kafka
- demo-register
- demo-provider
- demo-consumer
这里如果没有搭建的朋友,看一我上一篇,搭建很简单。
改造 demo-consumer
,demo-provider
我们改造其实涉及的地方并不多,所以只要实现了 Spring Cloud Bus 组件的相互通信,下面自定义事件就很轻松的可以完成,因此如果没有实现的小伙伴,请先实现之后,在往下看,这样就不会觉得摸不着头脑了。
自定义事件SelfRemoteApplicationEvent
分别向这两个组件(demo-consumer
,demo-provider
)中添加该事件。
public class SelfRemoteApplicationEvent extends RemoteApplicationEvent {
public SelfRemoteApplicationEvent() {
super();
}
public SelfRemoteApplicationEvent(String str, String originService,
String destinationService) {
super(str, originService, destinationService);
}
}
自定义事件监听SelfBusConfiguration
同样向这两个组件(demo-consumer
,demo-provider
)中添加监听组件。
@Configuration
@RemoteApplicationEventScan(basePackageClasses = SelfRemoteApplicationEvent.class)
public class SelfBusConfiguration {
@EventListener
public void onUserRemoteApplicationEvent(SelfRemoteApplicationEvent selfRemoteApplicationEvent) {
System.out.println("-----------------------------------------------------------------------------------------");
System.out.printf("SelfRemoteApplicationEvent - " +
" Source : %s , 发送源 : %s , 接收源 : %s \n",
selfRemoteApplicationEvent.getSource(),
selfRemoteApplicationEvent.getOriginService(),
selfRemoteApplicationEvent.getDestinationService());
System.out.println("-----------------------------------------------------------------------------------------");
}
}
定义事件发送SelfBusController
(只在demo-consumer
)添加
@RestController
public class SelfBusController implements ApplicationEventPublisherAware {
/**
* 事件发布
*/
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
/**
* Spring Cloud bus 外部化配置
*/
@Autowired
private BusProperties busProperties;
/**
* 通过GET 请求 触发事件发送
* @param selfEvent
* @param destination
* @return
*/
@GetMapping("/bus/self/publish/{destination}/{selfEvent}")
public boolean publishUserEvent(@PathVariable("selfEvent") String selfEvent,
@PathVariable(value = "destination", required = false) String destination) {
//这里由于我没有定义ID ,这里Spring Cloud Bus 自己默认实现了ID
String instanceId = busProperties.getId();
//自定义事件初始化
SelfRemoteApplicationEvent selfRemoteApplicationEvent =
new SelfRemoteApplicationEvent(selfEvent,instanceId,destination);
//发布事件
applicationEventPublisher.publishEvent(selfRemoteApplicationEvent);
return true;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}
这里的具体实现和操作,我已经在注释中说明,很好理解。
添加完后的目录结构
demo-consumer

demo-provider

运行以及结果
简单的Spring Cloud Bus 自定义事件已经完成,看上去是不是没有那么复杂。 接下来让我们运行起来,查看我们自定义事件是否好用。
1.启动组件,注意启动顺序
- kafka
- demo-register
- demo-provider
- demo-consumer
2.首先我们来查看一下 注册中心,组件是否都注册进来了。

3.接着查看我们的demo-provider,demo-consumer
俩个组件是否连接上了 kafka
。这里只要console最后输出如下类似信息,表示没问题。

4.接着让我们请求 发送自定义事件发送方法 看看是否成功。
友情提示,建议清空demo-provider,demo-consumer
两个组件的console输出。以便更清晰的查看结果。
使用POSTMAN 模拟请求:

查看demo-consumer
console输出:

查看demo-provider
console输出:

我们可以看到 自定义事件发布到总线已经完成。有的小伙伴回纳闷为什么 我从demo-consumer
发送,但是demo-consumer
也监听到了事件呢,那是因为每个实现 Spring Cloud Bus
的组件 即是监听者 也是发布者。这个从源码中可以看出,有时间,我会搞一篇源码相关内容。
小结
由于Spring Cloud 2.x 很多地方不兼容 Spring Cloud 1.5x,因此在做整合的时候,也是看了很多源码实现,希望可以帮助大家理解。
网友评论