那时的我
真的存在这样的需求么?
真的存在这样的需求么?
真的存在这样的需求么?
Unbelievable!难以置信.jpg
在我还没有遇到这个需求之前我可能会说,怎么可能会有这样的需求嘛?
以至于我能够列出许多原因:
- 技术选型肯定要先做好啊,
Kafka
和RabbitMQ
一起用算怎么回事呢 - 这俩都有
Topic
的概念,为啥要接入多个呢 - ...
但是它出现了!它真的出现了!不管你有多无法想象为什么会有这样的需求,它终究还是被我遇到了
好嘛,既然如此,那就只能老老实实的看看怎么实现了
(不甘心的小声bb:我想知道真的有人遇到过这样的需求么?)
简单粗暴法
于是乎我点开了搜索引擎,作为一个能够熟练借助搜索引擎进行代码搬运。。。咳咳
在输入spring boot 多个 kafka
这个几个关键字后,果不其然,第一篇就是相关的内容
我不禁赞叹了一番自己极高的搜索天赋,接着就点进去想看看能不能直接复制,啊不,是看看能不能让我在经过无与伦比的(这是什么鬼形容词)思考之后借鉴借鉴
文中的思路是这样的:
- 首先是配置文件
spring:
kafka:
one:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
two:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
- 然后是第一个
Kafka
配置类
@Configuration
public class KafkaOneConfig {
@Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;
@Bean
public KafkaTemplate<String, String> kafkaOneTemplate() {
return new KafkaTemplate<>(producerOneFactory());
}
@Bean
public KafkaListenerContainerFactory<? extends MessageListenerContainer> kafkaOneContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerOneFactory());
return factory;
}
@Bean
public ProducerFactory<String, String> producerOneFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, String> consumerOneFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
}
- 接着是第二个
Kafka
配置类
@Configuration
public class KafkaTwoConfig {
@Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;
@Bean
public KafkaTemplate<String, String> kafkaTwoTemplate() {
return new KafkaTemplate<>(producerTwoFactory());
}
@Bean
public KafkaListenerContainerFactory<? extends MessageListenerContainer> kafkaTwoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerTwoFactory());
return factory;
}
@Bean
public ProducerFactory<String, String> producerTwoFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, String> consumerTwoFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
}
- 最后是收发消息
@Slf4j
@Component
public class Kafka {
@Autowired
@Qualifier("kafkaOneTemplate")
private KafkaTemplate kafkaOneTemplate;
@Autowired
@Qualifier("kafkaTwoTemplate")
private KafkaTemplate kafkaTwoTemplate;
public void sendOne(String msg) {
kafkaOneTemplate.send("topic", msg);
}
public void sendTwo(String msg) {
kafkaTwoTemplate.send("topic", msg);
}
// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
@KafkaListener(topics = "topic", containerFactory = "kafkaOneContainerFactory")
public void listenerOne(ConsumerRecord<?, ?> record) {
log.info("Kafka one 接收到消息:{}", record.value());
}
@KafkaListener(topics = "topic", containerFactory = "kafkaTwoContainerFactory")
public void listenerTwo(ConsumerRecord<?, ?> record) {
log.info("Kafka two 接收到消息:{}", record.value());
}
}
看到这里基本上就有思路了,不得不说还是比较方便易懂的,直接CV过来改改就能用
但是这种方式有一个致命的问题,在我的需求中是不确定数量的,是的没错,可能在后续的维护中需要添加或移除一个或多个Kafka
如果按照这种方式,那我每天的工作就会变成:
- 添加
Kafka
配置和代码,重新打包部署 - 删除
Kafka
配置和代码,重新打包部署 - 添加
Kafka
配置和代码,重新打包部署 - 删除
Kafka
配置和代码,重新打包部署 - ...
嗯?那我为什么不找个工厂拧螺丝?
哎,没想到我12年寒窗苦读,如今终于出人头地,以为能干出一番大事业。。。
(快醒醒,别做梦了,干不完可是要加班干的)
好的,继续我们伟大的。。。emmm,拧螺丝事业
于是我就想有没有可能把这个做成能够动态添加删除而不是在代码中写死呢
思维发散
现在我们把自己想象成一个极度厌恶麻烦的人,没错,极度厌恶,能改2个地方绝不改3个地方,能改1个地方绝不改2个地方,能让别人改就绝不自己改
在忽略了如何实现的情况下就有了下面3种简化版操作方案:
- 修改配置文件并重启
- 修改配置文件并自动刷新
- 实现一套可视化页面
毕竟基于我们极度厌恶麻烦的人设,改代码是不可能的,而改配置,重启,或是可视化操作还是有很多人能做到的
所以方案3(实现一套可视化页面)得到了我极大的认可,但是考虑到画页面的工作量也是比较大的
最终我决定,先实现方案1(修改配置文件并重启)再实现方案3(实现一套可视化页面),至于方案2(修改配置文件并自动刷新),有了可视化就没必要自动刷新了
现在我们大致明确了这个需求在完成之后是个什么玩意儿(从操作上来说),接下来就要想想如何实现了
在考虑实现之前,让我们先来俯瞰一下这个需求,找找Kafka
和RabbitMQ
存在哪些共同点,也就是从哪个角度去抽象这个需求
抽象?什么抽象?抽什么象?抽象什么?这还用抽象?这不直接上来就敲?
当然了,上来就敲肯定没问题,就是后续的重构可能让你比较糟心,作为一个极度厌恶麻烦的人设,与其之后因为功能不支持而重构还不如一步到位直接干到完美,当然了,理想很丰满,不知道大家觉得这个世界上究竟存不存在完美的代码呢?
如何抽象
抽象是一门很高深的艺术,单单会几个设计模式肯定是做不好抽象的,能否做出好的抽象基于大量的抽象实践,没有实践,理论再牛逼设计出来也是一坨shi,是的,就是不断的练(赶紧的,拿你们公司的项目练),知道你接手的代码为什么像shi么,没错,那都是我练手练下来的hhh
扯远了,直接给结论,大家觉得用事件模型来作为抽象怎么样?
Kafka
和RabbitMQ
主要的应用场景就是作为消息中间件,无非就是发布订阅模型,也就是事件模型
如果我们将其抽象为事件模型,那么无论是Kafka
还是RabbitMQ
,又或者是RocketMQ
和ActiveMQ
,甚至今后可能出现的更NB的消息中间件,只要符合事件模型就都能够将它们囊括其中
于是乎我们完全可以设想,在项目中使用抽象的事件模型进行消息的发布和订阅,将具体的实现隐藏起来,对我们后续的扩展和维护来说具有非常大的便利性
举个例子,比如我们现在的项目使用RabbitMQ
来作为事件模型的具体实现,当有一天RabbitMQ
的吞吐量不足以支持我们的业务时,我们可以直接将实现替换成Kafka
而不用在代码中把RabbitTemplate
改成KafkaTemplate
,把@RabbitListener
改成@KafkaListener
,将技术实现和业务逻辑解耦,这其实也是DDD中所追求的方式
引擎和端点
基于我们现在的需求,我们可以先设计一下多个Kafka
和RabbitMQ
的结构
{
kafka: {
kafka1: {},
kafka2: {}
},
rabbitmq: {
rabbitmq1: {},
rabbitmq2: {}
}
}
在这里我定义了两个概念,事件引擎EventEngine
和事件端点EventEndpoint
-
事件引擎就是
kafka
和rabbitmq
,代表事件的发布订阅所依赖的中间件 -
事件端点就是
kafka1
,kafka2
和rabbitmq1
,rabbitmq2
,代表不同的中间件服务(集群)
基于事件引擎的抽象,我们可以定义Kafka
的事件引擎KafkaEventEngine
和RabbitMQ
的事件引擎RabbitEventEngine
,如果后续需要集成RocketMQ
就可以直接扩展一个RocketEventEngine
基于事件端点的抽象,我们就可以添加任意个数的Kafka
或是RabbitMQ
,不用再通过硬编码集成,每个端点的配置相互隔离,互不影响
交换机
因为我们现在集成了多个事件引擎和事件端点,所以当我们在发布或订阅时就需要指定使用哪个事件引擎下的哪个事件端点(就是要使用哪个中间件)
比如我们现在需要把事件E1
发送到kafka1
上,所以我们就需要先从kafka1
,kafka2
,rabbitmq1
,rabbitmq2
中把kafka1
找出来
所以我定义了事件交换机EventExchange
的概念
public interface EventExchange {
Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context);
}
将所有的事件引擎传入,然后返回对应的事件端点(事件引擎中持有对应的事件端点,如kafka
这个事件引擎持有kafka1
和kafka2
这两个事件端点)
在我们发布或订阅时,可以传入一个事件交换机来指定发布事件到哪个中间件,或是订阅哪个中间件的消息
比如当我们的业务business1
需要发送一个事件消息到kafka1
和rabbitmq2
时,我们就可以定义一个Business1EventExchange
public class Business1EventExchange implements EventExchange {
@Override
public Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context) {
return engines.stream()
.flatMap(it -> it.getEndpoints().stream())
.filter(it -> it.getName().equals("kafka1") ||
it.getName().equals("rabbitmq2"))
.collect(Collectors.toList());
}
}
这里建议单独实现一个类并且基于业务起名,这样当业务business1
需要更换事件端点或是添加事件端点时,只需要修改Business1EventExchange
即可,对于业务来说就隐藏了具体的中间件细节,业务逻辑和技术实现就不会耦合在一起
发布器
基于之前的业务business1
,我们已经通过事件交换机指定了kafka1
和rabbitmq2
,但是Kafka
和RabbitMQ
发消息的方式完全不一样,所以在事件发布的时候需要让两者分别使用各自的方式发送消息
于是我又定义了事件发布器EventPublisher
的概念
public interface EventPublisher {
void publish(Object event, EventEndpoint endpoint, EventContext context);
}
在平时的开发中我们一般
使用KafkaTemplate
来发消息到Kafka
使用RabbitTemplate
来发消息到RabbitMQ
我就将发送消息这一动作抽离出来,就有了事件发布器
我们给业务business1
来实现一个定制化的事件发布器
public class Business1EventPublisher implements EventPublisher {
@Override
public void publish(Object event, EventEndpoint endpoint, EventContext context) {
if (endpoint.getName().equals("kafka1")) {
//KafkaEventEndpoint持有KafkaTemplate
KafkaTemplate<Object, Object> kafkaTemplate =
((KafkaEventEndpoint) endpoint).getTemplate();
//发送Kafka消息
kafkaTemplate.sendDefault(event);
}
if (endpoint.getName().equals("rabbitmq2")) {
//RabbitEventEndpoint持有RabbitTemplate
RabbitTemplate rabbitTemplate =
((RabbitEventEndpoint) endpoint).getTemplate();
//发送RabbitMQ消息
rabbitTemplate.convertAndSend(event);
}
}
}
只要我们在发布时传入指定的事件发布器就可以让事件消息以我们想要方式发送到Kafka
或是RabbitMQ
中
最终的事件发布大概就是这样
@RestController
public class Business1Controller {
@Autowired
private EventConcept concept;
@PostMapping("/business1")
public void business1() {
concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher())
.publish(new Business1Event());
}
}
订阅器
和事件发布一样,Kafka
和RabbitMQ
监听消息的方式也不一样,而且没办法使用@KafkaListener
和@RabbitListener
,因为使用注解就没有办法实现动态监听了
当我们动态添加了一个端点之后,却没办法监听这个端点的消息,那不是很蠢?
所以我直接找到这两个注解的解析类KafkaListenerAnnotationBeanPostProcessor
和RabbitListenerAnnotationBeanPostProcessor
跟着看了一下是怎么监听的
过程就不赘述了,直接上代码
- 首先是
Kafka
public class KafkaReceiver {
public void receive() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
MessageListenerContainer container = factory.createContainer("topic");
container.getContainerProperties().setMessageListener(new MessageListener<Object, Object>() {
@Override
public void onMessage(ConsumerRecord<Object, Object> data) {
//接收数据
}
});
container.start();
}
}
- 然后是
RabbitMQ
public class RabbitReceiver {
public void receive() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("queue");
endpoint.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//接收数据
}
});
SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
container.start();
}
}
同时我们定义一个事件订阅器EventSubscriber
的概念来抽象监听消息这个动作
public interface EventSubscriber {
Subscription subscribe(EventListener listener, EventEndpoint endpoint, EventContext context);
}
现在给业务business1
来实现一个定制化的事件订阅器
public class Business1EventSubscriber implements EventSubscriber {
@Override
public Subscription subscribe(EventListener listener, EventEndpoint endpoint, EventContext context) {
if (endpoint.getName().equals("kafka1")) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
MessageListenerContainer container = factory.createContainer("topic");
ContainerProperties properties = container.getContainerProperties();
properties.setMessageListener(new org.springframework.kafka.listener.MessageListener<Object, Object>() {
@Override
public void onMessage(ConsumerRecord<Object, Object> data) {
//使用统一的接口返回数据
listener.onEvent(data.value(), endpoint, context);
}
});
container.start();
return new KafkaSubscription(container);
}
if (endpoint.getName().equals("rabbitmq2")) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
SimpleRabbitListenerEndpoint listenerEndpoint = new SimpleRabbitListenerEndpoint();
listenerEndpoint.setQueueNames("queue");
listenerEndpoint.setMessageListener(new org.springframework.amqp.core.MessageListener() {
@Override
public void onMessage(Message message) {
//使用统一的接口返回数据
listener.onEvent(message.getBody(), endpoint, context);
}
});
SimpleMessageListenerContainer container = factory.createListenerContainer(listenerEndpoint);
container.start();
return new RabbitSubscription(container);
}
return Subscription.EMPTY;
}
}
只要我们在订阅时传入指定的事件订阅器就可以以我们想要方式监听Kafka
或是RabbitMQ
的事件消息
最终的事件订阅大概就是这样
@Component
public class Business1SubscriberConfiguration implements ApplicationRunner {
@Autowired
private EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template()
.exchange(new Business1EventExchange())
.subscriber(new Business1EventSubscriber())
.subscribe(new GenericEventListener<Business1Event>() {
@Override
public void onGenericEvent(Business1Event event, EventEndpoint endpoint, EventContext context) {
//可以通过EventEndpoint获取消息的来源
}
});
}
}
大致的流程就是这样了吧
我们通过两者的共同点进行功能抽象,在我们使用时就可以不依赖具体的实现
通过不同的实现就可以非常灵活的控制发布和订阅的各个细节,而不需要在业务逻辑代码中改来改去
细节优化
接下来我们看看有没有可以优化的细节问题
不知道大家还记不记得业务business1
发布时的示例代码,如下:
@RestController
public class Business1Controller {
@Autowired
private EventConcept concept;
@PostMapping("/business1")
public void business1() {
concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher())
.publish(new Business1Event());
}
}
这里每次发布事件的时候都会new
一个Business1EventExchange
和Business1EventPublisher
非常多余,而且浪费性能
针对这个问题,我从两个维度进行了使用优化
- 基于多级配置实现各级配置的优先级
- 基于事件模版
EventTemplate
的多场景配置
多级配置
每次发布或订阅的时候都要手动指定事件交换机,事件发布器或是事件订阅器,如果这个配置每次都一样,甚至整个项目都是统一的配置,那就可以提供一种类似于全局的配置,而不需要每次手动指定
-
事件交换机
-
可以指定一个全局的配置
-
当手动指定了事件交换机就使用指定的事件交换机
-
当未手动指定事件交换机则使用全局配置的事件交换机
-
当未配置全局的事件交换机则默认发布到所有端点
-
-
事件发布器/事件订阅器
-
可以给事件引擎和事件端点配置一个事件发布器/事件订阅器
-
当手动指定了事件发布器/事件订阅器就使用指定的事件发布器/事件订阅器
-
当未手动指定事件发布器/事件订阅器则使用事件端点配置的事件发布器/事件订阅器
-
当未配置事件端点的事件发布器/事件订阅器则使用事件引擎配置的事件发布器/事件订阅器
-
当我们配置了全局的事件交换机或是事件引擎和事件端点的事件发布器/事件订阅器,我们的代码就可以改为下面这样:
- 发布
@RestController
public class Business1Controller {
@Autowired
private EventConcept concept;
@PostMapping("/business1")
public void business1() {
concept.template().publish(new Business1Event());
}
}
- 订阅
@Component
public class Business1SubscriberConfiguration implements ApplicationRunner {
@Autowired
private EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template().subscribe(new GenericEventListener<Business1Event>() {
@Override
public void onGenericEvent(Business1Event event, EventEndpoint endpoint, EventContext context) {
//可以通过EventEndpoint获取消息的来源
}
});
}
}
模版配置
虽然我们现在支持了全局的配置,但是当处理之前提到的业务business1
需要单独自定义时,全局配置就没有什么效果了
于是我定义了事件模版EventTemplate
的概念
我们可以新建并持有一个事件模版进行配置复用,写法如下:
@RestController
public class Business1Controller {
private final EventTemplate template;
@Autowired
public Business1Controller(EventConcept concept) {
//template方法每次调用都会创建一个模版
this.template = concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher());
}
@PostMapping("/business1")
public void business1() {
template.publish(new Business1Event());
}
}
属性继承
由于我们会有多个Kafka
和多个RabbitMQ
,所以配置文件可能会非常长,甚至有很多配置都是一样的
所以我就想到可以将相同的配置提取出来作为父配置,其他端点继承父配置,大概就是这个样子:
concept:
event:
kafka:
enabled: true #需要手动开启
endpoints: #在该节点下配置多个kafka,属性同spring.kafka
kafka1: #端点名称-kafka1
inherit: parent #继承名称为parent的端点配置
bootstrap-servers:
- 192.168.30.100:9092
- 192.168.30.101:9092
- 192.168.30.102:9092
consumer:
group-id: kafka1
kafka2: #端点名称-kafka2
inherit: parent #继承名称为parent的端点配置
bootstrap-servers:
- 192.168.60.200:9092
- 192.168.60.201:9092
- 192.168.60.202:9092
consumer:
group-id: kafka2
parent: #作为其他端点的父配置
enabled: false #是否启用该端点,这里作为父配置不启用
producer:
retries: 0
acks: 1
consumer:
enable-auto-commit: false
template:
default-topic: sample
listener:
ack-mode: manual_immediate
parent
端点作为父配置,kafka1
和kafka2
通过属性inherit
指定继承parent
端点的配置同时额外定义bootstrap-servers
和consumer.group-id
扩展支持
当我们实现了功能之后就要看看对后续维护非常重要的扩展性
当然了,一般情况下扩展性肯定是在功能实现之前就规划好一些扩展点或是使用高扩展性的架构
这里只是把这块放到了后面来讲
引擎端点工厂
首先呢,我觉得需要给事件引擎EventEngine
和事件端点EventEndpoint
各添加一个工厂EventEngineFactory
和EventEndpointFactory
这就方便对两者做一些扩展,同时这里有一个经验建议
当在实现或封装一个库时,如果逻辑中用到了new
来创建实例时
建议替换为工厂类来创建或者提供一个可重写的方法来创建
为使用者提供一个自定义入口
引擎端点自定义配置
定义EventEndpointConfigurer
和EventEngineConfigurer
来提供后置的自定义配置
这里可以类比WebMvcConfigurer
之类的配置类的功能
事件上下文
增加了一个事件上下文EventContext
的概念,默认基于Map
实现,我们可以在发布或订阅时设置自定义的属性来更灵活的传递数据和处理逻辑
配置继承处理器
对于配置的继承功能,我也定义了一个ConfigInheritHandler
接口
考虑到由于版本问题如果导致配置继承出现问题因小失大,就可以自定义配置继承逻辑或是重写其中的一些方法来避免问题
结束
好了,差不多就这样吧,如果大家有遇到类似的需求可以参考参考这个思路,或者直接用我写的库,GitHub传送门(顺便点个Star吧哈哈哈)
网友评论