美文网首页rabbitmq
rabbitmq生产消费示例代码

rabbitmq生产消费示例代码

作者: 于情于你 | 来源:发表于2020-11-13 08:01 被阅读0次
    public class RabbitmqTest {
        private static final String EXCHANGE_NAME ="test_exchange";
        private static final String ROUTING_KEY ="test_routing_key";
        private static final String QUEUE_NAME ="test_queue";
        private static final String USER_NAME ="rabbit";
        private static final String PASSWORD ="rbDMmPQHFq";
        private static final String IP_ADDRESS ="xxx";
        private static final int PORT = 5662;
    
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
          
            // 生产消息
            publish("hello_world");
            
            // 消费消息
            consume();
    
        }
    
        private static void consume() throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost(IP_ADDRESS);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USER_NAME);
            connectionFactory.setPassword(PASSWORD);
    
            // 创建连接
            Connection connection = connectionFactory.newConnection();
          
            // 创建信道
            final Channel channel = connection.createChannel();
          
            // 设置客户端最多接收未被 ack 的消息的个数
            channel.basicQos(64);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    System.out.println("recv message: " + new String(body));
    
                    channel.basicAck(envelope.getDeliveryTag(), false);
    
                }
            };
    
            channel.basicConsume(QUEUE_NAME, consumer);
    
            // 等待回调函数执行完毕之后 关闭资源
            TimeUnit.SECONDS.sleep(5);
    
            channel.close();
            connection.close();
        }
    
        private static void publish(String message) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost(IP_ADDRESS);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USER_NAME);
            connectionFactory.setPassword(PASSWORD);
    
    
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
          
            // 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            
            // 创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 队列交换器绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            
            // 发送消息
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    
            channel.close();
            connection.close();
        }
    
    
    
    }
    

    相关文章

      网友评论

        本文标题:rabbitmq生产消费示例代码

        本文链接:https://www.haomeiwen.com/subject/kdtvbktx.html