美文网首页
RabbitMQ生产者&消费者模式

RabbitMQ生产者&消费者模式

作者: 阿畅00000 | 来源:发表于2023-01-04 10:58 被阅读0次

    项目上在使用异步处理时,总会想到用RabbitMQ的生产者和消费者模式,下文演示如何使用RabbitMQ Java客户端生产和消费消息。

    maven依赖
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.2.1</version>
     </dependency>
    
    生产者
    package com.nightmare.study2023.study0103;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author: WuChang
     * @Description:
     * @Date: Created in  2023-01-03 5:17 PM
     * @Modified By:
     */
    public class RabbitProducer {
    
        private static final String EXCHANGE_NAME ="exchange_nightmare";
        private static final String ROUTING_KEY="routing_nightmare";
        private static final String QUEUE_NAME="queue_nightmare";
        private static final String IP_ADDRESS="**********";
        private static final int PORT = 5672;
        private static final String USERNAME="guest";
        private static final String PASSWORD ="*****";
    
    
        public static void main(String[] args){
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("/");
            try {
                // 创建连接
                Connection connection = factory.newConnection(); 
                // 创建信道
                Channel channel = connection.createChannel(); 
                // 创建一个类型为direct、持久化的、非自动删除的交换器
                channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
                // 创建一个持久话、非排他的、非自动删除的队列
                channel.queueDeclare(QUEUE_NAME,true,false,false,null);
                // 将交换器与对垒通过路由键绑定
               channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
                // 发送一条持久化的消息
                String message ="Hello World!";
                channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                // 关闭资源
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    生产者代码中 首先和RabbitMQ服务器建立连接,根据连接创建信道,创建交换器和队列,并通过路由键进行绑定;然后发送一条消息;最后关闭信道,关闭连接

    消费者
    package com.nightmare.study2023.study0103;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author: WuChang
     * @Description:
     * @Date: Created in  2023-01-04 2:28 PM
     * @Modified By:
     */
    public class RabbitConsumer {
    
        private static final String QUEUE_NAME="queue_nightmare";
        private static final String IP_ADDRESS="******";
        private static final int PORT = 5672;
        private static final String USERNAME="guest";
        private static final String PASSWORD ="*******";
    
        public static void main(String[] args){
            Address[] addresses = new Address[]{
                    new Address(IP_ADDRESS,PORT)
            };
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("/");
            try{
                Connection connection = factory.newConnection(addresses); // 创建连接
                Channel channel = connection.createChannel();// 创建信道
                channel.basicQos(64); // 设置客户端最多接收未被ack的消息的个数
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
                        System.out.println("message" + new String(body));
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            channel.basicAck(envelope.getDeliveryTag(),false);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }catch (IOException e){
                            e.printStackTrace();
                        }
                    }
                };
                channel.basicConsume(QUEUE_NAME,consumer);
                // 等待毁掉函数执行完毕之后,关闭资源
                TimeUnit.SECONDS.sleep(5);
                channel.close();
                connection.close();
            }catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ生产者&消费者模式

          本文链接:https://www.haomeiwen.com/subject/oayrcdtx.html