美文网首页
rabbitmq调用

rabbitmq调用

作者: 我心悠然 | 来源:发表于2023-07-05 15:52 被阅读0次

rabbitmq分为发送端和接收端

发送端代码
  • 首先,引入依赖jar
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.4.0.RELEASE</version>
</dependency>
  • 先在application.properties里面配置rabbitmq地址和topic
spring.rabbitmq.addresses=IP
spring.rabbitmq.port=5672
spring.rabbitmq.username=canace(账号)
spring.rabbitmq.password=123456(密码)
spring.rabbitmq.connection-timeout=2000
rabbit_topickey=wsQueue
  • 新建一个类,发送消息
@Autowired
private RabbitTemplate rabbitTemplate;

@Value("${rabbit_topickey}")
private String rabbitmqtopic;

@GetMapping("/test")
public String toRabbitmq(String username) {
    System.out.println("To Rabbitmq>>>>>>>>" + username);
    // 第一个参数为刚刚定义的队列名称
    this.rabbitTemplate.convertAndSend(rabbitmqtopic, username);
    return "add User to rabbitmq Successful!!";
}
    • 第二种发送,引入jar包
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.4.3</version>
</dependency>
    • 创建RabbitmqConfig,链接rabbitmq
@Configuration
public class RabbitmqConfig {
    private static String host = "IP";
    private static String userName = "canace";
    private static String passWord = "123456";
    private static int port = 5672;

    public static Channel getChannelInstance(String connectionDescription) {
        try {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection connection = connectionFactory.newConnection(connectionDescription);
            return connection.createChannel();
        } catch (Exception e) {
            throw new RuntimeException("获取Channel连接失败", e);
        }
    }

    @Bean
    private static ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 配置连接信息
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(passWord);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setConnectionTimeout(30000);
        connectionFactory.setHandshakeTimeout(30000);
        // 网络异常自动连接恢复
        connectionFactory.setAutomaticRecoveryEnabled(true);
        // 每10秒尝试重试连接一次
        connectionFactory.setNetworkRecoveryInterval(10000);
        return connectionFactory;
    }
}
    • 发送消息代码
public String send(String exchange, String routingKey, String message) throws IOException {
    if (channel == null || !channel.isOpen()) {
        channel = RabbitmqConfig.getChannelInstance("队列消息生产者");
    }
    message = URLEncoder.encode(message, "utf-8");
    AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType(
            "UTF-8").build();
    channel.basicPublish(exchange, routingKey, false, basicProperties, message.getBytes());
    RedisUtil.set(message, message);
    return "send ok";
}
接收端代码
  • 首先,引入依赖jar
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.4.0.RELEASE</version>
</dependency>
  • 先在application.properties里面配置一样的rabbitmq地址,然后queues与发送端的key配置成一样,group消费组可自行配置。
@Component
@RabbitListener(queues = "wsQueue", group = "wushaungRabit")
public class rabbitMQConsumer {
    /**
     * 消息消费
     * @RabbitHandler 代表此方法为接受到消息后的处理方法
     */
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("[Rabbitmq] recieved message: " + msg);
    }
}
    • 第二种,引入jar包spring-rabbit和spring-amqp
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.3.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.3.2</version>
</dependency>
    • 配置rabbitmq.properties
rabbit.hosts=192.168.1.95
rabbit.username=canace
rabbit.password=123456
rabbit.port=5672
rabbit.virtualHost=/
    • 配置amqp-share.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.3.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-4.3.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <util:properties id="appConfig" location="classpath:rabbitmq.properties"></util:properties>
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"
                               port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
                               virtual-host="${rabbit.virtualHost}"
                               channel-cache-size="10"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义消息队列-->
    <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>

    <!--交换机定义,绑定队列-->
    <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

</beans>

    • 配置amqp-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <import resource="amqp-share.xml"/>
    <!-- 配置监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory" type="simple">
        <rabbit:listener ref="spittleListener" method="onMessage"
                         queues="spittle.alert.queue.3"/>
    </rabbit:listener-container>
    <bean id="spittleListener" class="com.pamirs.agent.rabbitmq2xc.demo.handler.SpittleAlertHandler"/>
</beans>
    • 消费端接收端
public class SpittleAlertHandler implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            String body = new String(message.getBody(), "UTF-8").replace("\"", "");
            System.out.println("body>>>>>>>>>>>>>>>" + body);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}
      • 第三种,引入jar包amqp-client
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.0</version>
</dependency>
      • 消费端代码
@Component
public class rabbitMGconsumer {
    @PostConstruct
    public void init() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.1.95");
            factory.setPort(5672);
            factory.setUsername("canace");
            factory.setPassword("123456");
            factory.setVirtualHost("/");
            //// 网络异常自动连接恢复
            //factory.setAutomaticRecoveryEnabled(true);
            //// 每10秒尝试重试连接一次
            //factory.setNetworkRecoveryInterval(10000);
            
            Connection connect = factory.newConnection();
            Channel channel = connect.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume("test", false, tag, new DeliverCallback() {
              @Override
              public void handle(String consumerTag, Delivery message) throws IOException {
                  System.out.printf("exchange : %s routingKey : %s consumer tag : %s thread : %s 消息内容 : %s%n", message.getEnvelope().getExchange(), message.getEnvelope().getRoutingKey(), consumerTag, Thread.currentThread().getName(), new String(message.getBody()));
                  channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
              }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("cancel" + consumerTag);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

相关文章

网友评论

      本文标题:rabbitmq调用

      本文链接:https://www.haomeiwen.com/subject/tzjkudtx.html