美文网首页
springboot实现rabbitmq延期队列

springboot实现rabbitmq延期队列

作者: 禅兜 | 来源:发表于2018-11-26 17:08 被阅读0次

1.配置mq相关信息,在配置application.properties

spring.application.name=rabbitmq-test
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

2.定义常量类

/**
 * @author 
 * @Title: MyConstant
 * @ProjectName demo
 * @Description: TODO
 * @date 2018/9/2015:25
 */
public class MyConstant {

    public static final String EXCHANGE_NAME = "test.message";

    public final static String QUEUE_NAME = "my.timeout.message";

    public static final String ROUTKEY="my.routing.key";

    public static final String RECV_EXCHANGE_NAME = "timeout-exchange";
    public final static String RECV_QUEUE_NAME = "my.timeout.message2";
    public static final String RECV_ROUTKEY="test.timeout.message";
}

3.mq配置信息

import com.chain.rabbitmq.hello.MyChannelAwareMessageListener;
import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitConfig {


    @Bean
    public Queue recvQueue() {
        return new Queue(MyConstant.RECV_QUEUE_NAME);
    }
    @Bean
    public FanoutExchange recvFanoutExchange(){
        return new FanoutExchange(MyConstant.RECV_EXCHANGE_NAME);
    }
    @Bean
    public Binding toBindingRecvFanoutExchange(){
        return BindingBuilder.bind(recvQueue()).to(recvFanoutExchange());
    }


    /**
     * 配置队列信息
     * @return
     */
    @Bean
    public Queue queueMessage() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-dead-letter-exchange", "timeout-exchange");//过期消息转向路由
        map.put("x-dead-letter-routing-key", "test.timeout.message");//过期消息转向路由相匹配routingkey
        Queue queue = new Queue(MyConstant.QUEUE_NAME, true, false, false, map);
        System.out.println("arguments :" + queue.getArguments());
        return queue;
    }

    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange(MyConstant.EXCHANGE_NAME,true,false);
    }

    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(queueMessage()).to(headersExchange()).where("key").matches("123456");
    }


    /**
     * 配置连接工厂
     * @return
     */
    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.56.129", 5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
//        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPublisherConfirms(true); // enable confirm mode
        return connectionFactory;
    }

    @Bean
    SimpleMessageListenerContainer container() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(MyConstant.QUEUE_NAME);
        container.setConcurrentConsumers(1);//设置多个并发消费者一起消费,并支持运行时动态修改
        container.setMaxConcurrentConsumers(10);//设置最多的并发消费者
       // container.setMessageListener(listenerAdapter);
        container.setChannelAwareMessageListener(new MyChannelAwareMessageListener()); //配置消费监听者
        container.setPrefetchCount(1); //会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
        return container;
    }
    
}

4.定义消息生产者

import com.chain.rabbitmq.config.MyConstant;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * 生产者
 * @author lian.ran
 * @Title: DelayedSender
 * @ProjectName demo
 * @Description: TODO
 * @date 2018/9/2015:08
 */
@Component
public class DelayedSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "发送延时消息数据 " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("发送了消息");
                message.getMessageProperties().setDelay(2);
                message.getMessageProperties().setExpiration("12000");
                Map<String, Object> headers=new HashMap<String, Object>();
                message.getMessageProperties().setHeader("key", "123456");
                message.getMessageProperties().setHeader("token", "654321");
                return message;
            }
        };
        this.rabbitTemplate.convertAndSend(MyConstant.EXCHANGE_NAME,MyConstant.ROUTKEY,context,messagePostProcessor);
    }

}

5.定义消费者
5.1 自动确定消息ack的消费者

import com.chain.rabbitmq.config.MyConstant;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author lian.ran
 * @Title: DelayedConsumer
 * @ProjectName demo
 * @Description: TODO
 * @date 2018/9/2016:50
 */
@Component
@RabbitListener(queues = MyConstant.RECV_QUEUE_NAME)
public class DelayedConsumer {

    @RabbitHandler
    public void process(String message) {
        System.out.println("消费到延期数据:fanout Receiver C: " + message);
    }
}

5.2 手动确定消息的消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author lian.ran
 * @Title: MyChannelAwareMessageListener
 * @ProjectName demo
 * @Description: 可以自己手动ack的消息监听器
 * @date 2018/9/2020:53
 */
@Component
public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel)  {
        try {
            System.out.println("监听到了消息:"+message);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {

        }
    }
}

6.启动springboot工程
7.测试模拟发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender delayedSender;

    @Test
    public void testDelayedSender() throws Exception {
        delayedSender.send();
    }


}
~~

相关文章

网友评论

      本文标题:springboot实现rabbitmq延期队列

      本文链接:https://www.haomeiwen.com/subject/bojbqqtx.html