springboot整合rabbitmq
maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>````
##配置:
配置队列、交换器、队列绑定、消息监听容器等
```/**
* Created by Torres on 17/1/21.
*/
@Configuration
public class RabbitmqConfig {
@Autowired
Environment environment;
private final String name = "helloworld";
@Bean
Queue queue() {
return new Queue(name, false);
}
@Bean
DirectExchange exchange() {
return new DirectExchange("hello-exchange");
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).withQueueName();
}
@Bean
SimpleMessageListenerContainer simpleMessageListenerContainer(MessageListenerAdapter listenerAdapter,
@Qualifier("simpleConnectionFactory") ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setMessageListener(listenerAdapter);
container.setConnectionFactory(connectionFactory);
container.setQueueNames(name);
return container;
}
@Bean(name = "simpleConnectionFactory")
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(environment.getProperty("rabbitmq.addresses"));
connectionFactory.setUsername(environment.getProperty("rabbitmq.username"));
connectionFactory.setPassword(environment.getProperty("rabbitmq.password"));
connectionFactory.setVirtualHost(environment.getProperty("rabbitmq.vhost"));
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "processMessage");
}
@Bean(name = "myRabbitTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}```
##消息接受处理
**
-
Created by Torres on 17/1/21.
*/
public class Receiver {
private CountDownLatch countDownLatch = new CountDownLatch(1);public void processMessage(String message) {
System.out.println(new Date() + "receive message is:" + message);
countDownLatch.countDown();
}public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
}```
消息发送:
这里使用定时任务每个五秒发送message.
/**
* Created by Torres on 17/1/21.
*/
@EnableScheduling
//任务调度
@Component
public class Worker {
@Autowired
@Qualifier("myRabbitTemplate")
RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 5000)
public void sendMessage() {
rabbitTemplate.convertAndSend("helloworld", "hello");
}
}```
##结果:
Sat Jan 21 16:18:00 CST 2017:hello
Sat Jan 21 16:18:03 CST 2017receive message is:hello
Sat Jan 21 16:18:03 CST 2017:hello
Sat Jan 21 16:18:06 CST 2017:hello
Sat Jan 21 16:18:08 CST 2017receive message is:hello
Sat Jan 21 16:18:09 CST 2017:hello
Sat Jan 21 16:18:12 CST 2017:hello
Sat Jan 21 16:18:13 CST 2017receive message is:hello
网友评论