1. acivemq支持两种模型
点对点(quene):
每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的
相关性.可以由多个发送者,也可以有多个消费者,但一个消息只能被一个消费者消费。即使消费者服务挂了,重启后,依赖能够获取到消息,不会丢消息。
发布订阅(topic):
每个消息可以有多个消费者。
生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消
费自它订阅之后发布的消息.
2. 如何保证消息的不丢
1. 生产者发送到activemq服务端保证消息正常到达机制:
设置应答模式为同步模式:这样,在服务端没有给生产者应答之前,生产者一直堵塞。
((ActiveMQConnection)connection).setUseAsyncSend(true);
2. activemq服务端如何保证服务端重启不会丢消息:
消息持久化到硬盘。
配合消息同步发送模式,只有当消息持久化到硬盘之后,才会给生产者应答。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
3. 如何保证消费者,真的消费了消息。
消费者设置手动应答模式,当真正消费了消息之后,才应答:
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 进行手动应答
message.acknowledge();
4. topic发布订阅模式下,持久化订阅。
topic模式,是普通订阅模式:生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息.也就是说,消费者要是重启了或者网络断了,那么重启过程中的消息就丢了。
那么还有一种持久化订阅模式:也就是订阅的时候告诉它订阅者id,然后在消费者断线了之后,activemq会一直给它留着这个消息。
//设置客户端id
connection.setClientID("客户端id");
//持久订阅
consumer = session.createDurableSubscriber((Topic) destination,"订阅name");
3. 生产者发送消息的同步和异步的方式
同步:activemq没有收到生产者的消息的时候,比如还在网络传输过程中,或者如果开启了消息持久化,则还没有持久化的时候,生产者一直堵塞,直到activemq服务端应答返回。
((ActiveMQConnection)connection).setUseAsyncSend(false);
异步:消息发送之后就不管了。
((ActiveMQConnection)connection).setUseAsyncSend(true);
设置异步发送的方式:
- 在连接的URI中配置
cf = new ActiveMQConnectionFactory("tcp://locahost:61616jms.useAsyncSend=true");
2.在ConnectionFactory层配置
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3.在Connection层配置
((ActiveMQConnection)connection).setUseAsyncSend(true);
4. 消费者消费消息的同步和异步的方式
同步:接收者主动接收消息,若消息队列中没有消息则阻塞等待
Message message = consumer.receive(); //
异步:采用事件回调的机制。
consumer.setMessageListener(MessageListener实现类); //(异步接收)
5. 应答模式
生产者应答模式:
支持事务:如果支持事务,则当session提交的时候,才会发送消息。一般不使用事务,影响吞吐量。
session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
session.commit();
不支持事务:
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
消费者应答模式:
自动应答:消息过来了则认为就应答了,不管消费者是否真正处理者这个消息。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
客户端应答:消费者手动应答。
session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
message.acknowledge();
6. 消费者从activemq服务端获取消息到底是推还是拉?
在创建Queue的时候,配置以url形式跟在队列名后面:session.createQueue("TEST.FOO?consumer.prefetchSize=10")
consumer的prefetchSize参数默认为1000,consumer 有推和拉2种方式获取消息:当 prefetchSize = 0 时,pull;当 prefetchSize > 0 时,push,prefetchSize的意思是消费者预取多少条消息,也就是当消费者没有超过这么多条消息没有应答的时候,不再推了。
因为 consumer 的 prefetchSize 参数默认为1000,所以 activeMQ 默认是推。而且是一条一条地推。
8. activemq的高可用和负载均衡
activemq可以搭建Master-Slaver主从结构来达到高可用,也能搭建broker-cluster集群,但是现在一般真正需要搭建集群的高吞吐、大数据量的需求,则可能还是用rocketmq或kafka好一点。
7. Activemq管理界面
Activemq的管理界面可以手动创建queue和topic,可以手动发送消息,可以看到topic,订阅者,连接,网络。
可以看到Queue:queName, 还有多少消息待消费,多少个消费者, 入队过多少个消息,出队过多少个消息。
image.png
8. Demo代码
spring boot有内置activemq
如果觉得不是很灵活,也可以不用springboot的集成。
网友评论