美文网首页
RabbitMq 消息发布/确认 以及对确认失败的消息的处理

RabbitMq 消息发布/确认 以及对确认失败的消息的处理

作者: 闲置的Programmer | 来源:发表于2022-05-10 17:31 被阅读0次

    这部分没有涉及到交换机,所以一个消息只能被消费一次,多个消费者之间是竞争关系


    image.png

    1、连接rabbitMq

    pom文件

    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.14.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.11.0</version>
            </dependency>
    
    
        </dependencies>
    
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    /**
     * @Author: yokipang
     * @Date: 2022/5/10
     * 连接工厂创建信道
     */
    public class RabbitMqUtils {
        public static Channel getChannel() throws  Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.1.xx");
            factory.setUsername("xxx");
            factory.setPassword("xxxxx");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return  channel;
        }
    
    }
    
    

    2、具体方法

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmCallback;
    import utils.RabbitMqUtils;
    
    import java.util.UUID;
    import java.util.concurrent.ConcurrentNavigableMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    
    public class ConfirmMessage {
    
        public static final int message_count = 1000;
    
    
        public static void main(String[] args) throws Exception {
            //单个确认 690ms
            //publishMessage1();
    
            //批量确认 160ms
            //publishMessage2();
    
            //异步批量确认
            publishMessage3();
        }
    
        //单个确认
        public static  void publishMessage1() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queueName = UUID.randomUUID().toString();
            /**
             * 声明队列
             * 参数1 队列名称
             * 参数2 消息是否持久化
             * 参数3 是否可以由多个消费者消费
             * 参数4 是否自动删除
             * 参数5 其他
             */
            channel.queueDeclare(queueName,true,false,false,null);
    
            //开启发布、确认
            channel.confirmSelect();
    
            //开始时间
            long begin = System.currentTimeMillis();
    
            for (int i = 0 ; i<message_count;i++){
                String message = i+"";
                channel.basicPublish("",queueName,null,message.getBytes());
                boolean flag = channel.waitForConfirms();
                if(flag){
                    System.out.println("消息发送成功");
                }
            }
    
            long end = System.currentTimeMillis();
    
            System.out.println("所需时间:"+(end-begin)+"ms");
    
        }
    
    
        //批量发布确认
        public static  void publishMessage2() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queueName = UUID.randomUUID().toString();
            /**
             * 声明队列
             * 参数1 队列名称
             * 参数2 消息是否持久化
             * 参数3 是否可以由多个消费者消费
             * 参数4 是否自动删除
             * 参数5 其他
             */
            channel.queueDeclare(queueName,true,false,false,null);
    
            //开启发布、确认
            channel.confirmSelect();
    
            //开始时间
            long begin = System.currentTimeMillis();
    
            //批量确认数量
            int batchSize = 100;
    
    
            for (int i = 0 ; i<message_count;i++){
                String message = i+"";
                channel.basicPublish("",queueName,null,message.getBytes());
    
                if( i % batchSize == 0){
                    channel.waitForConfirms();
                }
            }
    
            long end = System.currentTimeMillis();
            System.out.println("所需时间:"+(end-begin)+"ms");
    
        }
    
    
    
        //异步批量发布确认
        public static  void publishMessage3() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queueName = UUID.randomUUID().toString();
            /**
             * 声明队列
             * 参数1 队列名称
             * 参数2 消息是否持久化
             * 参数3 是否可以由多个消费者消费
             * 参数4 是否自动删除
             * 参数5 其他
             */
            channel.queueDeclare(queueName,true,false,false,null);
            //开启发布、确认
            channel.confirmSelect();
    
            /**
             * 线程安全有序的哈希表 适用于高并发情况
             * 1、轻松地将序号与消息进行关联
             * 2、轻松的批量删除条目 只要给到序号
             * 3、支持高并发(多线程)
             */
            ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
    
            //消息发送前准备监听器 监听消息发送状态
            /**
             * 消息确认成功函数
             * 1、消息标记
             * 2、是否批量
             */
            ConfirmCallback ackConfirmCallback =(deliveryTag,multiple)->{
                //是否批量处理
                if(multiple){
                    //删除掉已确认的消息 剩下的就是未成功发送的消息
                    ConcurrentNavigableMap<Long,String> confirmd = concurrentSkipListMap.headMap(deliveryTag);
                    confirmd.clear();
                }else{
                    concurrentSkipListMap.remove(deliveryTag);
                }
                System.out.println("确认成功的消息:"+ deliveryTag);
            };
    
            /**
             * 消息确认失败函数
             * 1、消息标记
             * 2、是否批量
             */
            ConfirmCallback notAckConfirmCallback =(deliveryTag,multiple)->{
                String message = concurrentSkipListMap.get(deliveryTag);
                System.out.println("确认失败的消息:"+message +"   消息标记" + deliveryTag);
            };
    
            //开始时间
            long begin = System.currentTimeMillis();
            //监听器
            channel.addConfirmListener(ackConfirmCallback,notAckConfirmCallback);
    
            //发送消息
            for (int i = 0 ; i<message_count;i++){
                String message = i+"";
                channel.basicPublish("",queueName,null,message.getBytes());
                //记录所有要发送的消息
                concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
            }
    
            long end = System.currentTimeMillis();
            System.out.println("所需时间:"+(end-begin)+"ms");
    
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:RabbitMq 消息发布/确认 以及对确认失败的消息的处理

          本文链接:https://www.haomeiwen.com/subject/khalurtx.html