美文网首页消息中间件踩坑
消息中间件踩坑之旅(四)——RabbitMq运行流程及多点注意

消息中间件踩坑之旅(四)——RabbitMq运行流程及多点注意

作者: 简单DI年华 | 来源:发表于2019-01-26 21:35 被阅读0次

    这里博主推荐大家阅读由朱忠华先生编写的<<RabbitMq实战指南>>,这里有详细的客户端开发接口的说明,例如com.rabbotmq.client包的使用

    运行流程

    生产者
    1. 生产者连接到RabbitMq Broker,建立一个连接即程序里的Connection,开启一个信道

      ConnectionFactory factory = new ConnectionFactory();
      // "guest"/"guest" by default, limited to localhost connections
      factory.setUsername(userName);
      factory.setPassword(password);
      factory.setVirtualHost(virtualHost);
      factory.setHost(hostName);
      factory.setPort(portNumber);
      
      Connection conn = factory.newConnection();
      
      Channel channel = conn.createChannel();
      
    2. 生产者声明一个交换器,并设置相关属性,这里声明交换器的类型,是否持久化等,很重要哦,可以自行查看api(交换器也可以转发消息给别的交换器)

      channel.exchangeDeclare(exchangeName, "direct", true);
      
    1. 生产者声明一个队列并设置属性,比如排他性,是否持久化,是否过期删除等

      String queueName = channel.queueDeclare().getQueue();
      or
      channel.queueBind(queueName, exchangeName, routingKey);
      
    2. 生产者通过路由键将交换器和队列进行绑定起来

      channel.queueBind(queueName, exchangeName, routingKey)
      
    3. 生产者发送信息到RabbitMq Broker,其中包含路由键、交换器信息

      byte[] messageBodyBytes = "Hello, world!".getBytes();
      channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
      
    4. 对应交换器根据收到的路由键匹配发送的队列`(重点type: topic、direct)

    5. 找到后,把消息存入队列

    6. 找不到,会退给生产者

    7. 关闭信道,关闭连接

      channel.close();
      conn.close();
      
    消费者
    1. 消费者连接到RabbitMq Broker,建立一个连接即程序里的Connection,开启一个信道

    2. 消费者向对应队列请求消费消息,

    3. 等待RabbitMq Broker回应并投递相应的队列消息给消费者,消费者然后进行回调处理(回调在轮询或者是阻塞里)

    4. 消费者完成回调,确认消息已经被成功消费(这里确认消息可以是自动应答,也可手动,建议是手动)

      boolean autoAck = false;
      channel.basicConsume(queueName, autoAck, "myConsumerTag",
           new DefaultConsumer(channel) {
               @Override
               public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body)
                   throws IOException
               {
                   String routingKey = envelope.getRoutingKey();
                   String contentType = properties.getContentType();
                   long deliveryTag = envelope.getDeliveryTag();
                   //。。。。。
                   // 手动应答 acknowledge receipt of the message
                   //Delivery Tag 用来标识信道中投递的消息
                   //第二个参数是requeue 如果是true表示通知RabbitMq把这个消息重新存入队列并发送给下一个消费者,反之删除
                   channel.basicAck(deliveryTag, false);
               }
           });
      
    5. RabbitMq 删除对应已经被消费者确认消费的消息

    6. 关闭信道

    7. 关闭连接

    重点

    • 以上过程不是绝对的,已经表明
    • Connection连接就是一条TCP连接,在实际项目中不要多次创建Connection,他是可以复用的,类似NIO。因为建立TCP连接开销很大。
    • Channel信道是建立在Connection上虚拟连接,RabbitMq处理的每条AMQP指令都是通过信道完成。
    • 每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMq确保了每个线程的私密性,就像独立的连接一样。
    • Channel不建议在线程间贡献,不然很酸爽
    • Connection可以复用不代表项目中只能创建一个,可以根据实际情况创建多个Connection,并让信道均摊到这些Connection上。

    相关文章

      网友评论

        本文标题:消息中间件踩坑之旅(四)——RabbitMq运行流程及多点注意

        本文链接:https://www.haomeiwen.com/subject/vmdwjqtx.html