这部分没有涉及到交换机,所以一个消息只能被消费一次,多个消费者之间是竞争关系
image.png
1、连接rabbitMq
pom文件
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author: yokipang
* @Date: 2022/5/10
* 连接工厂创建信道
*/
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.xx");
factory.setUsername("xxx");
factory.setPassword("xxxxx");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2、具体方法
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import utils.RabbitMqUtils;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConfirmMessage {
public static final int message_count = 1000;
public static void main(String[] args) throws Exception {
//单个确认 690ms
//publishMessage1();
//批量确认 160ms
//publishMessage2();
//异步批量确认
publishMessage3();
}
//单个确认
public static void publishMessage1() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
/**
* 声明队列
* 参数1 队列名称
* 参数2 消息是否持久化
* 参数3 是否可以由多个消费者消费
* 参数4 是否自动删除
* 参数5 其他
*/
channel.queueDeclare(queueName,true,false,false,null);
//开启发布、确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
for (int i = 0 ; i<message_count;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("所需时间:"+(end-begin)+"ms");
}
//批量发布确认
public static void publishMessage2() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
/**
* 声明队列
* 参数1 队列名称
* 参数2 消息是否持久化
* 参数3 是否可以由多个消费者消费
* 参数4 是否自动删除
* 参数5 其他
*/
channel.queueDeclare(queueName,true,false,false,null);
//开启发布、确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认数量
int batchSize = 100;
for (int i = 0 ; i<message_count;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
if( i % batchSize == 0){
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("所需时间:"+(end-begin)+"ms");
}
//异步批量发布确认
public static void publishMessage3() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
/**
* 声明队列
* 参数1 队列名称
* 参数2 消息是否持久化
* 参数3 是否可以由多个消费者消费
* 参数4 是否自动删除
* 参数5 其他
*/
channel.queueDeclare(queueName,true,false,false,null);
//开启发布、确认
channel.confirmSelect();
/**
* 线程安全有序的哈希表 适用于高并发情况
* 1、轻松地将序号与消息进行关联
* 2、轻松的批量删除条目 只要给到序号
* 3、支持高并发(多线程)
*/
ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
//消息发送前准备监听器 监听消息发送状态
/**
* 消息确认成功函数
* 1、消息标记
* 2、是否批量
*/
ConfirmCallback ackConfirmCallback =(deliveryTag,multiple)->{
//是否批量处理
if(multiple){
//删除掉已确认的消息 剩下的就是未成功发送的消息
ConcurrentNavigableMap<Long,String> confirmd = concurrentSkipListMap.headMap(deliveryTag);
confirmd.clear();
}else{
concurrentSkipListMap.remove(deliveryTag);
}
System.out.println("确认成功的消息:"+ deliveryTag);
};
/**
* 消息确认失败函数
* 1、消息标记
* 2、是否批量
*/
ConfirmCallback notAckConfirmCallback =(deliveryTag,multiple)->{
String message = concurrentSkipListMap.get(deliveryTag);
System.out.println("确认失败的消息:"+message +" 消息标记" + deliveryTag);
};
//开始时间
long begin = System.currentTimeMillis();
//监听器
channel.addConfirmListener(ackConfirmCallback,notAckConfirmCallback);
//发送消息
for (int i = 0 ; i<message_count;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//记录所有要发送的消息
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("所需时间:"+(end-begin)+"ms");
}
}
网友评论