Rabbitmq中绑定
exchange:flow
routing-key:user
bind-queue:flow_user
白话文就是,把user绑定到flow_user序列
发送方使用routing-key推送:
//把routing-key发送给名为flow的exchenge,然后exchenge负责向绑定的这个Queue推送
amqpTemplate.convertAndSend("flow","user", context);
Rabbit配置
- 添加exchange(这里类型type应该是topic,截图时候没有注意)
- 添加Queue
- 添加这个User 到exchange(注意routing-key)
SpringBoot集成Rabbitmq
- 注册配置bean
@Configurable
public class TopicRabbitConfig {
public final static String FLOW = "flow";
public final static String USER = "user";
public final static String USER_QUEUE = "flow_user";
@Bean
public Queue queueMessages3() {
return new Queue(USER_QUEUE);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(FLOW);
}
@Bean
Binding bindingExchangeMessages3(Queue queueMessages3, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages3).to(exchange).with(FLOW);
}
}
- 发送方代码
/**
* @Package: pterosaur.account.service.impl
* @Description: 模拟发送消息,测试使用
* @author: liuxin
* @date: 17/4/19 下午3:17
*/
@Component
public class AccountSentImpl {
@Autowired
private AmqpTemplate amqpTemplate;
private ExecutorService threadPool = Executors.newFixedThreadPool(8);
public void send() {
for (int i=0;i<10;i++){
String context = "hello :" + DateUtil.formatDatetime(System.currentTimeMillis())+",当前线程:"+Thread.currentThread().getName();
System.out.println("Sender : " + context);
threadPool.execute(new Runnable() {
@Override
public void run() {
amqpTemplate.convertAndSend(TopicRabbitConfig.FLOW,TopicRabbitConfig.USER, context);
}
});
}
}
}
- 接受方代码
/**
* @Package: pterosaur.account.service.impl
* @Description: mq信息处理实现类
* @author: liuxin
* @date: 17/4/19 下午2:55
*/
@Component
public class AccountReceiverImpl implements AccountReceiver {
private static final Logger logger = LoggerFactory.getLogger(AccountReceiverImpl.class);
@Autowired
ExecutorService threadPool;
/**
* 用户流水
*
* @param message
*/
@RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
@RabbitHandler
public void processUser(String message) {
threadPool.execute(new Runnable() {
@Override
public void run() {
logger.info("用户侧流水:{}",message);
}
});
}
}
- 测试代码
Sender : hello :2017-04-25 17:44:15,当前线程:main
Sender : hello :2017-04-25 17:44:20,当前线程:main
2017-04-25 17:44:25.754 INFO 67685 --- [pool-1-thread-1] p.a.service.impl.AccountReceiverImpl : 用户侧流水:hello :2017-04-25 17:44:20,当前线程:main
Sender : hello :2017-04-25 17:44:25,当前线程:main
Sender : hello :2017-04-25 17:44:30,当前线程:main
2017-04-25 17:44:32.048 INFO 67685 --- [pool-1-thread-2] p.a.service.impl.AccountReceiverImpl : 用户侧流水:hello :2017-04-25 17:44:30,当前线程:main
Sender : hello :2017-04-25 17:44:32,当前线程:main
Sender : hello :2017-04-25 17:44:33,当前线程:main
2017-04-25 17:44:35.556 INFO 67685 --- [pool-1-thread-3] p.a.service.impl.AccountReceiverImpl : 用户侧流水:hello :2017-04-25 17:44:33,当前线程:main
Sender : hello :2017-04-25 17:44:35,当前线程:main
Sender : hello :2017-04-25 17:44:37,当前线程:main
2017-04-25 17:44:38.797 INFO 67685 --- [pool-1-thread-1] p.a.service.impl.AccountReceiverImpl : 用户侧流水:hello :2017-04-25 17:44:37,当前线程:main
网友评论