Activemq

作者: 无聊之园 | 来源:发表于2019-05-26 21:18 被阅读0次

    1. acivemq支持两种模型

    点对点(quene):
    每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的
    相关性.可以由多个发送者,也可以有多个消费者,但一个消息只能被一个消费者消费。即使消费者服务挂了,重启后,依赖能够获取到消息,不会丢消息。

    发布订阅(topic):
    每个消息可以有多个消费者。
    生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消
    费自它订阅之后发布的消息.

    2. 如何保证消息的不丢

    1. 生产者发送到activemq服务端保证消息正常到达机制:
    设置应答模式为同步模式:这样,在服务端没有给生产者应答之前,生产者一直堵塞。

    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    

    2. activemq服务端如何保证服务端重启不会丢消息:
    消息持久化到硬盘。
    配合消息同步发送模式,只有当消息持久化到硬盘之后,才会给生产者应答。

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    

    3. 如何保证消费者,真的消费了消息。
    消费者设置手动应答模式,当真正消费了消息之后,才应答:

    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    // 进行手动应答
    message.acknowledge();
    

    4. topic发布订阅模式下,持久化订阅。
    topic模式,是普通订阅模式:生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息.也就是说,消费者要是重启了或者网络断了,那么重启过程中的消息就丢了。

    那么还有一种持久化订阅模式:也就是订阅的时候告诉它订阅者id,然后在消费者断线了之后,activemq会一直给它留着这个消息。

     //设置客户端id
    connection.setClientID("客户端id");
    //持久订阅
    consumer = session.createDurableSubscriber((Topic) destination,"订阅name"); 
    

    3. 生产者发送消息的同步和异步的方式

    同步:activemq没有收到生产者的消息的时候,比如还在网络传输过程中,或者如果开启了消息持久化,则还没有持久化的时候,生产者一直堵塞,直到activemq服务端应答返回。

    ((ActiveMQConnection)connection).setUseAsyncSend(false);
    

    异步:消息发送之后就不管了。

    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    

    设置异步发送的方式:

    1. 在连接的URI中配置
    cf = new ActiveMQConnectionFactory("tcp://locahost:61616jms.useAsyncSend=true");  
    

    2.在ConnectionFactory层配置

    ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);  
    

    3.在Connection层配置

    ((ActiveMQConnection)connection).setUseAsyncSend(true);  
    

    4. 消费者消费消息的同步和异步的方式

    同步:接收者主动接收消息,若消息队列中没有消息则阻塞等待

    Message message = consumer.receive(); //
    

    异步:采用事件回调的机制。

    consumer.setMessageListener(MessageListener实现类);  //(异步接收)
    

    5. 应答模式

    生产者应答模式:

    支持事务:如果支持事务,则当session提交的时候,才会发送消息。一般不使用事务,影响吞吐量。

     session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
    session.commit();
    

    不支持事务:

    session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    

    消费者应答模式:

    自动应答:消息过来了则认为就应答了,不管消费者是否真正处理者这个消息。

     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    

    客户端应答:消费者手动应答。

     session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    message.acknowledge();
    
    

    6. 消费者从activemq服务端获取消息到底是推还是拉?

    在创建Queue的时候,配置以url形式跟在队列名后面:session.createQueue("TEST.FOO?consumer.prefetchSize=10")
    consumer的prefetchSize参数默认为1000,consumer 有推和拉2种方式获取消息:当 prefetchSize = 0 时,pull;当 prefetchSize > 0 时,push,prefetchSize的意思是消费者预取多少条消息,也就是当消费者没有超过这么多条消息没有应答的时候,不再推了。
    因为 consumer 的 prefetchSize 参数默认为1000,所以 activeMQ 默认是推。而且是一条一条地推。

    8. activemq的高可用和负载均衡

    activemq可以搭建Master-Slaver主从结构来达到高可用,也能搭建broker-cluster集群,但是现在一般真正需要搭建集群的高吞吐、大数据量的需求,则可能还是用rocketmq或kafka好一点。

    7. Activemq管理界面

    http://localhost:8161/

    Activemq的管理界面可以手动创建queue和topic,可以手动发送消息,可以看到topic,订阅者,连接,网络。

    可以看到Queue:queName, 还有多少消息待消费,多少个消费者, 入队过多少个消息,出队过多少个消息。


    image.png

    8. Demo代码

    spring boot有内置activemq

    如果觉得不是很灵活,也可以不用springboot的集成。

    https://gitee.com/wuliaozhiyuan/private.git

    相关文章

      网友评论

          本文标题:Activemq

          本文链接:https://www.haomeiwen.com/subject/tuiftctx.html