业务需求是通过rabbitmq进行限流,google一堆资源也没找到满意的。
最后找到一个留作记录模型使用的生产者消费者模式。代码如下
RabbitMQConf 生产者配置文件
@Configuration
public class RabbitMQConf {
public static final String queueName = "chdeck.phone.number";
@Bean
public Queue helloQueue() {
return new Queue(queueName);
}
}
Producer
@Component
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 0; i < 500; i++) {
System.out.println("发了"+i+"条消息");
rabbitTemplate.convertAndSend(RabbitMQConf.queueName, "hello" + i);
}
}
}
生产者很简单,无需多言,一次产生500条消息。下面看消费者
消费者的坑很多,找了很多资料最后成功的这个
consumer
@Component
public class Consumer {
private static final String queueName = "chdeck.phone.number";
private static final int BATCH_SIZE = 100;
Channel channel;
@PostConstruct
public void init() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("*******");
factory.setPort(*****);
factory.setUsername("*****");
factory.setPassword("******");
factory.setVirtualHost("/");
Connection conn = factory.newConnection();
channel = conn.createChannel();
}
public void receive() throws IOException {
String queue001 = "chdeck.phone.number";
// channel.exchangeDeclare(exchange001, "direct", true, false, null);
channel.queueDeclare(queue001, true, false, false, null);
// 设置限流策略
// channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);
channel.basicQos(0, 100, false);
// 自定义消费者
Executer myExecuter = new Executer(channel);
// 进行消费,签收模式一定要为手动签收
channel.basicConsume(queue001, false, myExecuter);
}
}
Executer
public class Executer extends DefaultConsumer {
private Channel channel;
public Executer(Channel channel) {
super(channel);
// TODO Auto-generated constructor stub
this.channel=channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("body:==="+new String(body)+" t:" + System.currentTimeMillis());
// 手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息
channel.basicAck(envelope.getDeliveryTag(), false);
try {
//模拟程序执行时间
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意如果想要启动消费者进行消费,切记不可使用单元测试。因为单元测试有固定执行时间,只会消费几条消息就会自动停止。正确方法应该是通过条件注解,@Conditional***。来执行,实在不行写一个controller手动执行也可以,只需要启动一次即可。
网友评论