- 引入pom.xml
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/
https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.yml 配置部分
#rabbitmq
rabbitmq:
host: 10.0.0.2
port: 5672
username: springboot
password: password
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
#https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
listener:
concurrency: 2
#最小消息监听线程数
max-concurrency: 2
#最大消息监听线程数
mybatis:
mapper-locations: classpath:mapping/*.xml
···
- 创建配置文件
package com.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
//错误的 import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
*
* Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。
*
* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
*
* Routing Key:路由关键字,
*
* exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
*
* Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序.
* Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*
*
*/
@Configuration
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
public static final String EXCHANGE_A = "my-mq-exchange_A";
public static final String QUEUE_A = "QUEUE_A";
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// 必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* * 针对消费者配置 *
* 1. 设置交换机类型 *
* 2. 将队列绑定到交换机
*
* FanoutExchange:
* 将消息分发到所有的绑定队列,无routingkey的概念
*
* HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*
* 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。
常用交换器类型如下:
Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送".
即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
Topic(TopicExchange):按规则转发消息(最灵活)。
Headers(HeadersExchange):设置header attribute参数类型的交换机。
Fanout(FanoutExchange):转发消息到所有绑定队列。
*
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE_A);
}
/**
* 获取队列A
*
* @return
*/
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); // 队列持久
}
/**
* 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
*
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
}
}
- Producer
package com.redis;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
// 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
private RabbitTemplate rabbitTemplate;
/** * 构造方法注入rabbitTemplate */
@Autowired
public MsgProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
// rabbitTemplate如果为单例的话,那回调就是最后设置的内容
/**
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
* ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
*/
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");
System.out.println("消息主体 message : "+message);
System.out.println("消息主体 message : "+replyCode);
System.out.println("描述:"+replyText);
System.out.println("消息使用的交换器 exchange : "+exchange);
System.out.println("消息使用的路由键 routing : "+routingKey);
}
/**
* rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。
* @param content
*/
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId);
String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString();
System.out.println("结束发送消息c : " + content.toLowerCase());
System.out.println("消费者响应c : " + response + " 消息处理完成");
//logger.info(" 发送消息TO A:" + content);
// 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
//rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
}
/** * 回调 */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回调id:" + correlationData);
if (ack) {
logger.info("消息成功消费");
} else {
logger.info("消息消费失败:" + cause);
}
}
}
- Receiver
package com.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
System.out.println("接收处理队列A当中的消息: " + content);
}
}
- unit test
import java.util.Date;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.redis.MsgProducer;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqTest {
@Autowired
private MsgProducer sender;
@Test
public void sendTest() throws Exception {
//while(true){
String msg = new Date().toString();
sender.sendMsg(msg);
Thread.sleep(6000);
//}
}
}
- 可以测试通过
image.png
- 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
package com.test.ratelimit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.RateLimiter;
public class ComplexDemo {
private static RateLimiter rateLimiter = RateLimiter.create(10);
private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0);
public static void main(String[] args) {
// TODO Auto-generated method stub
List<Runnable> tasks = new ArrayList<Runnable>();
for (int i = 0; i < 100; i++) {
tasks.add(new UserRequest(i));
}
ExecutorService threadPool = Executors.newCachedThreadPool();
for (Runnable runnable : tasks) {
threadPool.execute(runnable);
}
}
private static boolean startGo(int i) {
//基于令牌桶算法的限流实现类
/**
* 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。
*
*/
/**
* tryAcquire(long timeout, TimeUnit unit)
* 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,
* 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)
*/
//判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序
if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) {
System.out.println("暂时无法获取令牌, 排队失败" + i);
fail.getAndIncrement();
System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get());
return false;
}
if (update() > 0) {
System.out.println("成功" + i);
suc.getAndIncrement();
System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get());
return true;
}
System.out.println("数据不足,失败");
return false;
}
private static int update() {
return 1;
}
private static class UserRequest implements Runnable {
private int id;
public UserRequest(int id) {
this.id = id;
}
public void run() {
startGo(id) ;
}
}
}
测试结果:
...
成功8
FAIL/SUC=89/10
成功7
FAIL/SUC=89/11
总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。
网友评论