美文网首页
12 Spring boot message--activemq

12 Spring boot message--activemq

作者: 中_中_ | 来源:发表于2019-05-07 10:40 被阅读0次

    Spring Framework框架为集成消息系统提供了扩展(extensive) 支持:从使
    用 JmsTemplate 简化JMS API,到实现一个能够异步接收消息的完整的底层设
    施。Spring AMQP提供一个相似的用于'高级消息队列协议'的特征集,并且Spring
    Boot也为 RabbitTemplate 和RabbitMQ提供了自动配置选项。Spring Websocket
    提供原生的STOMP消息支持,并且Spring Boot也提供了starters和自动配置支持。

    JMS(Java Message Service)

    JMS大致可以分为两种功能区域,消息的生产和消费。JmsTemplate用于消息的生产和异步消息接收。Spring提供了许多消息监听器容器,您可以使用它们来创建消息驱动的POJO(MDP)。 Spring还提供了一种创建消息监听器的声明方式。

    org.springframework.jms.core包提供了使用JMS的核心功能。JmsTemplate对资源的使用和释放就像JbdcTemplate一样。JmsTemplate提供了多种方便的方法用于发送消息,异步消费消息,向用户暴露JMS session和消息生产者。

    org.springframework.jms.support包提供了JMSException异常的翻译功能。
    org.springframework.jms.support.converter包提供了Java对应和JMS 消息之间的转化。
    org.springframework.jms.support.destination包提供了多种管理JMS目的端的策略。
    org.springframework.jms.annotation包提供了必要的基础框架以支持使用@JmsListener注解驱动。
    org.springframework.jms.config包提供jms命名空间的解析器实现以及配置侦听器容器和创建侦听器端点的java配置支持。
    org.springframework.jms.connection包提供了合适的ConnectionFacotry的实现用于独立应用程序。

    JmsTemplate使用

    JmsTemplate是JMS核心包中的中心类,为在发送和异步接受消息的对资源的创建和释放简化了操作。
    编码使用JmsTemplate仅仅需要实现一个明确定义高级别的回调接口。MessageCreator回调接口用JmsTemplate中提供的Session来创建消息;SessionCallback提供JMS session;ProducerCalback暴露一对一的Session和MessageProducer。
    Jms接口提供两种发送消息的方法,一种使用传输模式,优先级和time-to-live作为服务质量参数(QOS),另一种不是QOS参数而是默认值。
    JmsTemplate是线程安全的类,所以可以配置一个并在多个地方引用。JmsTemplate是稳定的,在内部有一个到ConnectionFactory的引用,但是这种状态不是构建在会话之上。在Spring framework4.1中,JmsMessagingTemplate构建在JmsTemplate之上提供org.springframework.messaging.Message消息抽象的集成。

    连接

    JmsTemplate需要引用ConnectionFactory。 ConnectionFactory是JMS规范的一部分,并作为使用JMS的入口点。 客户端应用程序将其用作工厂以创建与JMS提供程序的连接,并封装各种配置参数,其中许多是特定于供应商的,例如SSL配置选项。

    使用SingleConnectionFactory
    spring提供了connectionFactory接口的实现,SingleConnectionFactory,通过调用createConnection方法返回同一个connection并忽略close的调用。

    使用cachingConnectionFactory
    cachingConnectionFactory继承自SingleConnectionFactory并添加了session,Messageproducer和MessageConsumer实例的缓存。初始缓存大小设置为1.

    消息监听容器

    消息监听容器用来从消息队列中接收消息并驱动MessageListener注入,负责所有消息的接收和分发对应的linstener处理。两种类型的消息监听容器:
    SimpleMessageListenerContainer:在启动时创建固定数量的JMS session和消费者,使用MessageConsumer.setMessageListener()标准方法注册linstener,并由这个linstener处理回调。
    DefaultMessageListenerContainer:对比于SimpleMessageListenerContainer,该容器允许在运行期间动态调整也能参与到外部事物管理。当时用JtaTransactionManager配置时每个接收的消息就会注册为XA事务。

    1,在测试环境安装activemq,/usr/activemq/bin/activemq start 启动
    2,使用admin/admin登陆,并创建两个用于测试的队列,firstQueue,secondQueue
    3,添加Spring boot对activemq的支持

     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    

    4,注入队列

        @Bean
        public Queue firstQueue(){
    
            return new ActiveMQQueue("firstQueue");
        }
    
        @Bean
        public Queue secondQueue(){
    
            return new ActiveMQQueue("secondQueue");
        }
    

    5,配置MessageConverter

        @Bean
        public MessageConverter getMessageConverter(){
    
            return new SimpleMessageConverter();
        }
    

    6,配置connectionFactory

        @Bean
        public ConnectionFactory getConnectionFactory(){
    
            return new CachingConnectionFactory(getActiveMQConnectionFactory());
        }
    
        public ActiveMQConnectionFactory getActiveMQConnectionFactory(){
    
            return new ActiveMQConnectionFactory("admin","admin","tcp://ip:61616");
        }
    

    7,配置JmsTemplate

        @Bean
        public JmsTemplate jmsTemplate(){
    
            JmsTemplate jmsTemplate =  new JmsTemplate(getConnectionFactory());
            jmsTemplate.setMessageConverter(getMessageConverter());
            return jmsTemplate;
        }
    

    8,定义生产者

    @Component
    public class Producer implements CommandLineRunner {
    
        @Autowired
        private Queue firstQueue;
    
        @Autowired
        private Queue secondQueue;
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
    
        @Override
        public void run(String... args) throws Exception {
    
            for (int i = 0; i < 20; i++) {
    
                if(i%2 == 0){
    
                    this.jmsTemplate.convertAndSend(this.firstQueue,"activemq firstQueue"+i);
                }else{
    
                    this.jmsTemplate.convertAndSend(this.secondQueue,"activemq secondQueue"+i);
                }
    
                Thread.sleep(2000);
            }
    
        }
    }
    

    9,定义消费者

    @Component
    public class Consumer {
    
        @JmsListener(destination = "firstQueue",containerFactory = "myFactory")
        public void receiveMessage(String text){
    
            System.err.println("received firstQueue "+text);
        }
    
        @JmsListener(destination = "secondQueue")
        public void receiveSecondQueue(String message){
    
            System.err.println("received second queue "+ message);
        }
    }
    

    运行程序可看到如下输出:

    received firstQueue activemq firstQueue4
    received second queue activemq secondQueue5
    received firstQueue activemq firstQueue6
    received second queue activemq secondQueue7
    received firstQueue activemq firstQueue8
    received second queue activemq secondQueue9
    received firstQueue activemq firstQueue10
    received second queue activemq secondQueue11
    received firstQueue activemq firstQueue12
    received second queue activemq secondQueue13
    received firstQueue activemq firstQueue14
    received second queue activemq secondQueue15
    received firstQueue activemq firstQueue16
    received second queue activemq secondQueue17
    received firstQueue activemq firstQueue18
    received second queue activemq secondQueue19
    
    

    activemq队列有如下信息:


    相关文章

      网友评论

          本文标题:12 Spring boot message--activemq

          本文链接:https://www.haomeiwen.com/subject/ylhfqqtx.html