美文网首页rabbitmq
Springboot整合RabbitMQ生产者

Springboot整合RabbitMQ生产者

作者: Ever_zh | 来源:发表于2018-09-28 13:46 被阅读0次

    暂时只介绍Work Queue。

    其他的工作模式其实都差不多  可以试着自己往里面加. 

    这只是个简单架子。 

    使用springboot 2.x 版本.

    pom文件.

    <parent>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-parent</artifactId>

    <version>2.0.3.RELEASE</version>

    <relativePath/> <!-- lookup parent from repository -->

    </parent>

    <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

    <java.version>1.8</java.version></properties>

    <dependencies>

    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-web</artifactId>

    </dependency>

    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-test</artifactId>

    <scope>test</scope>

    </dependency>

    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

    </dependency>

    </dependencies>

    <build>

    <plugins>

    <plugin>

    <groupId>org.apache.maven.plugins</groupId>

    <artifactId>maven-checkstyle-plugin</artifactId>

    <version>3.0.0</version>

    <configuration>

    <encoding>UTF-8</encoding>

    <configLocation>xml/google_checks.xml</configLocation> </configuration>

    </plugin>

    <plugin>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-maven-plugin</artifactId>

    </plugin>

    </plugins>

    </build>

    配置文件 application.properties 

    spring.rabbitmq.host=

    spring.rabbitmq.port=

    spring.rabbitmq.username=

    spring.rabbitmq.password=

    spring.rabbitmq.virtual-host=

    spring.rabbitmq.connection-timeout=

    spring.rabbitmq.publisher-confirms=true

    spring.rabbitmq.publisher-returns=true

    spring.rabbitmq.listener.simple.acknowledge-mode=manual   //实际使用必须 手动Ack 原因自查.

    写入一个java类用来配置rabbitmq

    import org.springframework.amqp.core.*;   

    import org.springframework.amqp.rabbit.core.RabbitTemplate; 

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 

    import org.springframework.beans.factory.annotation.Autowired; 

    import org.springframework.context.annotation.Bean; 

    import org.springframework.context.annotation.Configuration; 

     import java.text.SimpleDateFormat; 

    import java.util.Date;

    /**

    *rabbitmq 配置文件

    */

    @Configuration

    public class RabbitConfig {

    //自动注入RabbitTemplate模板类

        @Autowired

        private RabbitTemplaterabbitTemplate;

    /**

        * 模版类定义

        * Jackson消息转换器

        * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调  即消息发送到exchange  ack

        * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack

        * @return  amqp template

    */

        @Bean

        public AmqpTemplate amqpTemplate() {

    // 使用jackson 消息转换器

            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

    rabbitTemplate.setEncoding("UTF-8");

    // 开启returncallback    properties 需要 配置publisher-returns: true

            rabbitTemplate.setMandatory(true);

    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

    String correlationId = message.getMessageProperties().getCorrelationId();

    });

    //  消息确认  properties 需要配置publisher-returns: true

            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

    if (ack) {

    SimpleDateFormat df =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

    // 输出字符串

                    System.out.println("时间"+df.format(new Date()));

    System.out.println("消息发送到exchange成功,id: "+correlationData.getId());

    }else {

    System.out.println("消息发送到exchange失败,原因: "+ cause);

    }

    });

    return rabbitTemplate;

    }

    /**

        * 声明Direct交换机 支持持久化.

    *

        * @return the exchange

    */

        @Bean

        public Exchange directExchange() {

    return ExchangeBuilder.directExchange("exchange-1").durable(true).build();

    }

    /**

        * 声明一个队列 支持持久化.

        * @return the queue

    */

        @Bean

        public Queue directQueue() {

    return QueueBuilder.durable("queue-1").build();

    }

    /**

        * 通过绑定键 将指定队列绑定到一个指定的交换机 .

        * @param queue    the queue

        * @param exchange the exchange

        * @return the binding

    */

        @Bean

        public Binding directBindingA( Queue queue, Exchange exchange) {

    return BindingBuilder.bind(queue).to(exchange).with("send").noargs();

    }

    }

    生产者代码(Controller)

    import org.springframework.amqp.rabbit.core.RabbitTemplate;

    import org.springframework.amqp.rabbit.support.CorrelationData;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Controller;

    import org.springframework.web.bind.annotation.GetMapping;

    @Controller

    public class RabbitSender {

    //自动注入RabbitTemplate模板类

      @Autowired

      private RabbitTemplaterabbitTemplate;

    @GetMapping("/send")

    public void send()throws Exception {

    String s ="123456";

    //id + 时间戳 全局唯一

          CorrelationData correlationData =new CorrelationData("1234567890");

    rabbitTemplate.convertAndSend("exchange-1","send", s, correlationData);

    }

    }

    将此注解@EnableRabbit 加在@SpringBootApplication 后!

    下一篇 Springboot整合RabbitMQ消费者

    本文仅限本人小白学习参考,不足之处请大佬指正。

    相关文章

      网友评论

        本文标题:Springboot整合RabbitMQ生产者

        本文链接:https://www.haomeiwen.com/subject/zanuoftx.html