美文网首页
rabbitmq(三)事务和Confirm

rabbitmq(三)事务和Confirm

作者: guideEmotion | 来源:发表于2019-05-21 21:31 被阅读0次

一 消息确认机制

问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器,默认的情况是不知道的
两种解决方式:

  • AMQP实现了事务机制
  • Confirm模式

事务机制

TxSelect
j用户将当前channel设置成transaction模式

TxCommit
用于提交事务

txRollback
回滚事务

生产者

image.png
缺点 因为会增加请求次数,所以会减少吞吐量

Confirm模式

生产者端Confirm模式的实现原理

image.png

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;

参考:https://blog.csdn.net/hzw19920329/article/details/54340711

普通模式(串行)

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectiionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();//设置成confirm模式

        String msg = "hello confirm";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println(" 发送完毕 "+msg);

        if(!channel.waitForConfirms()){//单条确认一次
            System.out.println("send failer");
        }else{
            System.out.println("send successed");
        }
        channel.close();
        connection.close();

    }

批量模式

失败的话,也是这一批都没了

        String msg = "hello confirm";
       
        for(int i=0;i<10;i++){
             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
           System.out.println(" 发送完毕 "+msg);
        }
       

        if(!channel.waitForConfirms()){//发送多条后确认一次
            System.out.println("send failer");
        }else{
            System.out.println("send successed");
        }

从这点我们看出broker端默认情况下是进行批量回复的,并不是针对每条消息都发送一条ack消息;

异步

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
       public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            if (multiple) {
                  confirmSet.headSet(deliveryTag + 1L).clear();
              } else {
                    confirmSet.remove(deliveryTag);
              }
       }
       public void handleNack(long deliveryTag, boolean multiple) throws IOException {
             System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
             if (multiple) {
                 confirmSet.headSet(deliveryTag + 1L).clear();
              } else {
                  confirmSet.remove(deliveryTag);
              }
        }
});
for(int i=0;i<5;i++){
            long nextSeqNo = channel.getNextPublishSeqNo();
     channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());
            confirmSet.add(nextSeqNo);
}

目前没搞明白这么写的原理

相关文章

网友评论

      本文标题:rabbitmq(三)事务和Confirm

      本文链接:https://www.haomeiwen.com/subject/kqyszqtx.html