本文使用spring boot作为框架,使用spring framework也可以参考。
前提
安装部署并启动rabbitmq,参见 http://www.jianshu.com/p/9a32dca0c6aa
配置连接
constant.properties:
spring.rabbitmq.addresses=192.168.253.133:5672,192.168.253.134:5672
spring.rabbitmq.username=rabbitmq
spring.rabbitmq.password=rabbitmq
# 每个连接缓存channel的数量
spring.rabbitmq.cachesize=25
配置maven依赖
pom.xml:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
</dependencies>
</dependencyManagement>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
</parent>
<dependencies>
<!-- 引入spring amqp -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<!-- 引入spring boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 引入spring boot单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置
RabbitConfig .java:
@Configuration
public class RabbitConfig {
@Autowired
private Environment env;
/**
* 连接工厂
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(env.getProperty("spring.rabbitmq.addresses"));
connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
// connectionFactory.setCacheMode(CacheMode.CHANNEL);//默认是CHANNEL
connectionFactory.setChannelCacheSize(
Integer.parseInt(env.getProperty("spring.rabbitmq.cachesize")));
// 开启发布者确认
// connectionFactory.setPublisherConfirms(true);
// 开启发布者返回
// connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* 自定义管理类,负责管理声明队列、交换机等管理功能,spring amqp默认开启自动声明
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
/**
* 模板类,主要负责发送接收
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
/**
* direct类型交换机,只有精确匹配routing key的队列才能收到消息,且消息按一定规则分配,即一条消息只会被其中一个队列接收到
*/
@Bean
public Exchange directExchange() {
return ExchangeBuilder.directExchange("shark.direct").durable(true).build();
}
/**
* fanout类型交换机,类似于广播,所有绑定的队列均收到所有消息
*/
@Bean
public Exchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("shark.fanout").durable(true).build();
}
/**
* topic类型交换机,与direct类型相似,不过topic类型支持模糊匹配。# *
*/
@Bean
public Exchange topicExchange() {
return ExchangeBuilder.topicExchange("shark.topic").durable(true).build();
}
/**
* 自定义队列1
*/
@Bean
public Queue infoQueue() {
return QueueBuilder.durable("shark.info").build();
}
/**
* 自定义队列2
*/
@Bean
public Queue errorQueue() {
return QueueBuilder.durable("shark.error").build();
}
/**
* 绑定队列1到fanout交换机
*/
@Bean
public Binding bindingFanoutAnony() {
return BindingBuilder.bind(errorQueue()).to(fanoutExchange()).with("").noargs();
}
/**
* 绑定队列2到fanout交换机
*/
@Bean
public Binding bindingFanoutAnony2() {
return BindingBuilder.bind(infoQueue()).to(fanoutExchange()).with("").noargs();
}
/**
* 绑定自定义队列1到direct交换机
*
* @Primary 设置为默认的bean,注入时@Autowired 或未指定名称的@Resource将使用默认的bean
*/
@Primary
@Bean
public Binding bindingDirect1() {
return BindingBuilder.bind(infoQueue()).to(directExchange()).with("shark.info").noargs();
}
/**
* 绑定自定义队列2到direct交换机
*/
@Bean
public Binding bindingDirect2() {
return BindingBuilder.bind(errorQueue()).to(directExchange()).with("shark.error").noargs();
}
/**
* 绑定自定义队列1到topic交换机
*/
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(errorQueue()).to(topicExchange()).with("shark.*").noargs();
}
/**
* 绑定自定义队列2到topic交换机
*/
@Bean
public Binding bindingTopic2() {
return BindingBuilder.bind(infoQueue()).to(topicExchange()).with("shark.*").noargs();
}
}
启动Spring boot应用
Application.java
@SpringBootApplication(exclude = RabbitAutoConfiguration.class) //排除spring boot自动注入
@EnableAspectJAutoProxy(proxyTargetClass = true) // 激活Aspect自动代理
@PropertySource({ "classpath:/cn/com/ut/config/properties/constant.properties" })
public class Application {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(Application.class);
application.setBannerMode(Banner.Mode.LOG);
application.run(args);
}
}
测试
TestSendController.java
@RestController
@RequestMapping(value = "/test")
public class TestSendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource(name = "fanoutExchange")
private Exchange fanoutExchange;
@Resource(name = "bindingDirect1")
private Binding bindingDirect1;
@Resource(name = "bindingDirect2")
private Binding bindingDirect2;
@Resource(name = "topicExchange")
private Exchange topicExchange;
/**
* 广播消息到多个队列,所有队列均接收全部消息
*/
@GetMapping("/sendFanout")
public String sendFanout(@RequestParam(value = "msg", required = false) final String msg) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(fanoutExchange.getName(),
"msg" + i + " : " + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
return "success send 10 msg to fanout Exchange!";
}
/**
* 均衡分配消息多个队列,队列收到的消息预其他队列不一样(精确匹配)
*/
@GetMapping("/sendDirect")
public String sendDirect(@RequestParam(value = "msg", required = false) final String msg) {
// rabbitTemplate.convertAndSend(bindingDirect1.getExchange(),
// bindingDirect1.getRoutingKey(),
// "msg-1:" + msg);
// rabbitTemplate.convertAndSend(bindingDirect2.getExchange(),
// bindingDirect2.getRoutingKey(),
// "msg-2:" + msg);
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(bindingDirect1.getExchange(), "shark.info",
"msg" + i + " : " + msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
return "success send 10 msg to topic Exchange!";
}
/**
* 均衡分配消息多个队列,队列收到的消息预其他队列不一样(模糊匹配)
*/
@GetMapping("/sendTopic")
public String sendTopic(@RequestParam(value = "msg", required = false) final String msg) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "shark.info",
"msg" + i + " : " + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
return "success send 10 msg to topic Exchange!";
}
/******************** 接收(同步) ****************** */
@GetMapping("/receive")
public String receive(@RequestParam(value = "queue") String queueName) {
String msg = " 收到來自队列 " + queueName + "的消息:"
+ (String) rabbitTemplate.receiveAndConvert(queueName);
return msg;
}
}
监听器接收
- RabbitConfig .java 添加@EnableRabbit注解和SimpleRabbitListenerContainerFactory 的bean
@Configuration
@EnableRabbit
public class RabbitConfig {
/**
* 消息接收监听器容器工厂,需要开启@EnableRabbit注解
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
- TestSendController.java 添加监听器
/******************** 接收监听器****************** */
@RabbitListener(queues = "#{infoQueue.name}")
public void receive3(String reciveMsg) {
System.out.println("receive1(订阅了infoQueue) 收到信息:" + reciveMsg);
}
@RabbitListener(queues = "#{errorQueue.name}")
public void receive4(String reciveMsg) {
System.out.println("receive2(订阅了errorQueue) 收到信息:" + reciveMsg);
}
@RabbitListener(queues = "#{infoQueue.name}")
public void receive5(String reciveMsg) {
System.out.println("receive3(订阅了infoQueue) 收到信息:" + reciveMsg);
}
网友评论