美文网首页
RabbitMQ生产者&消费者模式

RabbitMQ生产者&消费者模式

作者: 阿畅00000 | 来源:发表于2023-01-04 10:58 被阅读0次

项目上在使用异步处理时,总会想到用RabbitMQ的生产者和消费者模式,下文演示如何使用RabbitMQ Java客户端生产和消费消息。

maven依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.2.1</version>
 </dependency>
生产者
package com.nightmare.study2023.study0103;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author: WuChang
 * @Description:
 * @Date: Created in  2023-01-03 5:17 PM
 * @Modified By:
 */
public class RabbitProducer {

    private static final String EXCHANGE_NAME ="exchange_nightmare";
    private static final String ROUTING_KEY="routing_nightmare";
    private static final String QUEUE_NAME="queue_nightmare";
    private static final String IP_ADDRESS="**********";
    private static final int PORT = 5672;
    private static final String USERNAME="guest";
    private static final String PASSWORD ="*****";


    public static void main(String[] args){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("/");
        try {
            // 创建连接
            Connection connection = factory.newConnection(); 
            // 创建信道
            Channel channel = connection.createChannel(); 
            // 创建一个类型为direct、持久化的、非自动删除的交换器
            channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
            // 创建一个持久话、非排他的、非自动删除的队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 将交换器与对垒通过路由键绑定
           channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
            // 发送一条持久化的消息
            String message ="Hello World!";
            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            // 关闭资源
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生产者代码中 首先和RabbitMQ服务器建立连接,根据连接创建信道,创建交换器和队列,并通过路由键进行绑定;然后发送一条消息;最后关闭信道,关闭连接

消费者
package com.nightmare.study2023.study0103;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @Author: WuChang
 * @Description:
 * @Date: Created in  2023-01-04 2:28 PM
 * @Modified By:
 */
public class RabbitConsumer {

    private static final String QUEUE_NAME="queue_nightmare";
    private static final String IP_ADDRESS="******";
    private static final int PORT = 5672;
    private static final String USERNAME="guest";
    private static final String PASSWORD ="*******";

    public static void main(String[] args){
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS,PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("/");
        try{
            Connection connection = factory.newConnection(addresses); // 创建连接
            Channel channel = connection.createChannel();// 创建信道
            channel.basicQos(64); // 设置客户端最多接收未被ack的消息的个数
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
                    System.out.println("message" + new String(body));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME,consumer);
            // 等待毁掉函数执行完毕之后,关闭资源
            TimeUnit.SECONDS.sleep(5);
            channel.close();
            connection.close();
        }catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

相关文章

网友评论

      本文标题:RabbitMQ生产者&消费者模式

      本文链接:https://www.haomeiwen.com/subject/oayrcdtx.html