序言
文档背景
消息队列改造是双创框架升级工作的一部分。
文档主题
文档主要讲述消息队列代码更新后,新的使用方式和如何使用原有的消费者模式完成业务逻辑。
文档结构图
[站外图片上传中...(image-e177db-1522380525219)]
文档变更历史
作者 | 日期 | 版本 | 变更点 |
---|---|---|---|
李清泉 | 2018-3-29 | 0.5 | 创建文档 |
配置文件的写法
配置数据源
配置数据源有多种方式,我使用过有效的有两种:
- 单独配置一个源
- 配置到绑定
单独配置一个源
rabbitmq:
addresses: amqp://192.168.1.241:5672
username: mqadmin
password: mqadmin
单独配置的方式是使用rabbitmq作为顶层配置,然后在其他配置中引用,如绑定时使用:
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: ${rabbitmq.addresses}
username: ${rabbitmq.username}
password: ${rabbitmq.password}
virtual-host: test1
在上述代码中引用了单独配置。这种好处是可以配置一次多处引用,避免重复写。
配置到绑定
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: amqp://192.168.1.241:5672
username: mqadmin
password: mqadmin
virtual-host: test1
这种方式是直接写到绑定上。如果只有一个配置,可以这么写,但如果有多个绑定并且用同一个数据源,就变成了冗余。
绑定数据源
绑定数据源是将数据源配置到一个变量之中,方便配置接收或者发送时使用。上述已经说过配置方法了。在下面的配置代码中,是将一个数据源配置到变量 rabbit1之中。
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: amqp://192.168.1.241:5672
username: mqadmin
password: mqadmin
virtual-host: test1
可以绑定多个的,如果下面的配置:
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: amqp://192.168.1.241:5672
username: mqadmin
password: mqadmin
virtual-host: test1
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
addresses: amqp://192.168.1.245:5672
username: mqadmin
password: mqadmin
virtual-host: test2
上面的代码配置了两个变量,rabbit1和rabbit2. 在接收和发送配置中可以引用这两个不同的变量代表不同的源。
配置接收
spring:
cloud:
stream:
bindings:
input1:
binder: rabbit1
contentType: text/plain
destination: testquene
上述代码中,定义一个叫做 input1的接收,内容格式为文本,目标(队列名)为testquene ,绑定到rabbit1源变量(上述的绑定配置)。
配置发送
spring:
cloud:
stream:
bindings:
output1:
binder: rabbit1
destination: testquene2
contentType: text/plain
上述代码中,配置了一个叫做output1的消息发送,目标(队列名)为testquene2,模式是文本。
完整配置示例
server:
port: 9087
rabbitmq:
addresses: amqp://192.168.1.241:5672
username: mqadmin
password: mqadmin
spring:
cloud:
stream:
bindings:
input1:
binder: rabbit1
#group: test.qqqq
contentType: text/plain
destination: mqTestDefault.test.qqqq
input2:
binder: rabbit1
#group: test.qqqq
contentType: text/plain
destination: quene1
output1:
binder: rabbit1
destination: mqTestDefault.test.qqqq
contentType: text/plain
output2:
binder: rabbit1
destination: quene1
contentType: text/plain
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: ${rabbitmq.addresses}
username: ${rabbitmq.username}
password: ${rabbitmq.password}
virtual-host: test1
# exchange:
defaultBinder:
消息队列使用方式
消息队列涉及到两种操作:
- 消费处理信息
- 生产发送信息
下面分别说明这两个操作的使用方式
消息队列的接收处理
消息队列的接收处理有两种方式:
- 直接使用监听
- 使用适配原有的消费模式
将配置文件中的接收与发送定义到代码中
无论是接收还是发送消息。都要先在一个接口类中定义信道。接收时,定义为SubscribableChannel
;发送时,定义为MessageChannel
。示例:
public interface Sink {
String INPUT1 = "input1";
String INPUT2="input2";
String OUTPUT1="output1";
String OUTPUT2="output2";
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input12();
@Output(OUTPUT1)
MessageChannel output1();
@Output(OUTPUT2)
MessageChannel output2();
}
上述代码中,分别定义了两个接收信道和两个发送信息。请务必注意名称一定要对应对配置文件中。比如上述的input1
input2
output1
output2
,必须在配置文件中存在的。
上述的定义是注册到spring之中的,使用时,只需要使用相应的名称的bean即可以。如果使用input1
的bean名称,即为input1的接收信道。
使用监听的方式
定义好接收与发送的spring bean后,可以在监听中使用接收了。
@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager extends AbstractMessageQuene implements MessageQueueManager{
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT1)
public void input1(Message<String> message) {
System.out.println("第一个队列:" + message.getPayload());
doSomething(message.getPayload());
}
}
上述代码中,StreamMessageQueneManager 这个类为spring bean,使用了@EnableBinding(Sink.class)
注解。代表将已经定义好的Sink
接口中的定义好的接收在这个类中开启监听。在public void input1(Message<String> message)
方法头上,加上了@StreamListener(Sink.INPUT1)
注解,目的是将Sink中的INPUT1代表的接收在这个方法上开启监听。当监听收到消息时,将自动调用public void input1(Message<String> message)
方法,传入message对象,我们就可以使用这个对象执行任何逻辑。
使用适配原有消费模式
原来的双创是使用生产-消费模式处理消息的,我们原来是使用MessageQueueManager
接口接收和发送信息的,这个接口代码如下:
public interface MessageQueueManager {
/**
* 发送消息
* @param queueName 队列名称
* @param message 放入队列的内容
*/
void sendMessage(String queueName, Object message);
/**
* 获取通道消息
* @param queueName 队列名称
* @return message 队列的内容
*/
Object getMessage(String queueName);
}
在Object getMessage(String queueName);
方法中,我们通过传入队列名的方法,主动获取消息的。因此改造后为了减少代码变动,这种方式保持不变。原有的代码不需要变动即可正常执行。原理是使用了适配的方法:
@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager implements MessageQueueManager{
private Map<String,Queue<String>> queueMap=new HashMap<>();
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT1)
public void input1(Message<String> message) {
System.out.println("第一个队列:" + message.getPayload());
Queue queue = getQueue(Sink.INPUT1);
queue.offer(message.getPayload());
}
private Queue getQueue(String queneName) {
Queue queue=queueMap.get(queneName);
if(queue==null){
queue=new ConcurrentLinkedQueue();//非阻塞
queueMap.put(queneName,queue);
}
return queue;
}
@Override
public Object getMessage(String queueName) {
Queue queue = getQueue(queueName);
return queue.poll();
}
@Override
public void sendMessage(String queueName, Object message) {
MessageChannel channel=channelMap.get(queueName);
if(channel==null){
throw new RuntimeException(queueName+"对应的信通不存在!");
}
if(message!=null) {
channel.send(MessageBuilder.withPayload(message).build());
}
}
}
在上述接口的实现类中,监听将input1收到的消息放入了临时非阻塞且线程安全的ConcurrentLinkedQueue
中。业务逻辑通过定时主动调用public Object getMessage(String queueName)
,获取到目标队列并取出消息执行逻辑。值得注意的是由于ConcurrentLinkedQueue
没有限制容量,如果不能及时消费掉里面存储的消息,可能会造成内存占用过多甚至溢出,因此需要考虑消费的速度和调用的间隔。
发送消息
消息的发送也有两种方式:
- 使用
messageChannel
- 使用原有生产消费模式接口
使用 messageChannel
相对于使用监听,发送也可以使用新的messageChannel
方式。
@Service
public class TestSender {
@Autowired
@Qualifier("output1")
MessageChannel output1;
public void send(){
output1.send(MessageBuilder.withPayload("您好,这是一个测试消息").build());
}
}
上述代码中,output1为spring bean的名称,代表了发送的信道代理。直接使用它就可以将消息发送到对应信道。
使用原有的适配
请参考上述的MessageQueueManager
接口,调用public void sendMessage(String queueName, Object message)
指定队列名即可。原理是,适配代码中已将messagechannel封装起来:
@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager implements MessageQueueManager{
@Autowired
@Qualifier("output1")
MessageChannel output1;
@Autowired
@Qualifier("output2")
MessageChannel output2;
private Map<String,MessageChannel> channelMap;
/**
* bean初始化后执行这个方法
*/
@PostConstruct
public void postConstruct(){
if (channelMap == null) {
channelMap = new HashMap<>();
channelMap.put(Sink.OUTPUT1, output1);
channelMap.put(Sink.OUTPUT2, output2);
}
}
@Override
public void sendMessage(String queueName, Object message) {
MessageChannel channel=channelMap.get(queueName);
if(channel==null){
throw new RuntimeException(queueName+"对应的信通不存在!");
}
if(message!=null) {
channel.send(MessageBuilder.withPayload(message).build());
}
}
@Override
public Object getMessage(String queueName) {
Queue queue = getQueue(queueName);
return queue.poll();
}
}
上述代码中,channel被放入Map中,通过队列名可以取出,然后发送消息。原理本质上是适配。
改造点
原有的双创中,只有solr和微博模块有用到消息队列的功能。
我认为,至少下面的模块可以用消息队列:
- solr写入消息
- 微博发布
- 站内信
- 日志
- 积分写入
- 定时任务协调
- 其他附合要求的模块
什么是“附合要求”呢?
消息队列应用点
消息队列主要应用到以下情境:
- 不要求该分逻辑与主逻辑实时性
- 执行慢的逻辑
- 任务协调 分布式集群中,同一个实例可以通过消息队列接收的唯一实例的特性进行任务协调。
“附合要求”的其他模块是可以使用消息队列的,这需要我们在后续优化。
网友评论