1.配置mq相关信息,在配置application.properties
spring.application.name=rabbitmq-test
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
2.定义常量类
/**
* @author
* @Title: MyConstant
* @ProjectName demo
* @Description: TODO
* @date 2018/9/2015:25
*/
public class MyConstant {
public static final String EXCHANGE_NAME = "test.message";
public final static String QUEUE_NAME = "my.timeout.message";
public static final String ROUTKEY="my.routing.key";
public static final String RECV_EXCHANGE_NAME = "timeout-exchange";
public final static String RECV_QUEUE_NAME = "my.timeout.message2";
public static final String RECV_ROUTKEY="test.timeout.message";
}
3.mq配置信息
import com.chain.rabbitmq.hello.MyChannelAwareMessageListener;
import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public Queue recvQueue() {
return new Queue(MyConstant.RECV_QUEUE_NAME);
}
@Bean
public FanoutExchange recvFanoutExchange(){
return new FanoutExchange(MyConstant.RECV_EXCHANGE_NAME);
}
@Bean
public Binding toBindingRecvFanoutExchange(){
return BindingBuilder.bind(recvQueue()).to(recvFanoutExchange());
}
/**
* 配置队列信息
* @return
*/
@Bean
public Queue queueMessage() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-dead-letter-exchange", "timeout-exchange");//过期消息转向路由
map.put("x-dead-letter-routing-key", "test.timeout.message");//过期消息转向路由相匹配routingkey
Queue queue = new Queue(MyConstant.QUEUE_NAME, true, false, false, map);
System.out.println("arguments :" + queue.getArguments());
return queue;
}
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(MyConstant.EXCHANGE_NAME,true,false);
}
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(queueMessage()).to(headersExchange()).where("key").matches("123456");
}
/**
* 配置连接工厂
* @return
*/
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.56.129", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true); // enable confirm mode
return connectionFactory;
}
@Bean
SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(MyConstant.QUEUE_NAME);
container.setConcurrentConsumers(1);//设置多个并发消费者一起消费,并支持运行时动态修改
container.setMaxConcurrentConsumers(10);//设置最多的并发消费者
// container.setMessageListener(listenerAdapter);
container.setChannelAwareMessageListener(new MyChannelAwareMessageListener()); //配置消费监听者
container.setPrefetchCount(1); //会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
return container;
}
}
4.定义消息生产者
import com.chain.rabbitmq.config.MyConstant;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 生产者
* @author lian.ran
* @Title: DelayedSender
* @ProjectName demo
* @Description: TODO
* @date 2018/9/2015:08
*/
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "发送延时消息数据 " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("发送了消息");
message.getMessageProperties().setDelay(2);
message.getMessageProperties().setExpiration("12000");
Map<String, Object> headers=new HashMap<String, Object>();
message.getMessageProperties().setHeader("key", "123456");
message.getMessageProperties().setHeader("token", "654321");
return message;
}
};
this.rabbitTemplate.convertAndSend(MyConstant.EXCHANGE_NAME,MyConstant.ROUTKEY,context,messagePostProcessor);
}
}
5.定义消费者
5.1 自动确定消息ack的消费者
import com.chain.rabbitmq.config.MyConstant;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lian.ran
* @Title: DelayedConsumer
* @ProjectName demo
* @Description: TODO
* @date 2018/9/2016:50
*/
@Component
@RabbitListener(queues = MyConstant.RECV_QUEUE_NAME)
public class DelayedConsumer {
@RabbitHandler
public void process(String message) {
System.out.println("消费到延期数据:fanout Receiver C: " + message);
}
}
5.2 手动确定消息的消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author lian.ran
* @Title: MyChannelAwareMessageListener
* @ProjectName demo
* @Description: 可以自己手动ack的消息监听器
* @date 2018/9/2020:53
*/
@Component
public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
try {
System.out.println("监听到了消息:"+message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
6.启动springboot工程
7.测试模拟发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender delayedSender;
@Test
public void testDelayedSender() throws Exception {
delayedSender.send();
}
}
~~
网友评论