RabbitMQ使用

作者: 飞翔的鲲 | 来源:发表于2018-05-22 09:25 被阅读11次

RabbitMQ入门与使用篇
http://www.cnblogs.com/SFLYQ/p/7358283.html
Rabbitmq 使用小记
https://www.jianshu.com/p/b63196b596be
RabbitMQ使用详解
https://www.cnblogs.com/enjoyall/p/7767462.html?utm_source=debugrun&utm_medium=referral

image.png

绿色的 X 就是 Exchange
红色的是 Queue ,这两者都在 Server 端,又称作 Broker。
蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型。

  • Exchange通常分为四种:

    fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能
    direct:该类型路由规则会将消息路由到binding key与routing key完全匹配的Queue中
    topic:与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配
    headers:该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配

消息队列的使用过程


  • 生产者发送消息

    客户端连接到消息队列服务器,打开一个channel。
    客户端声明一个exchange,并设置相关属性。
    客户端声明一个queue,并设置相关属性。
    客户端使用routing key,在exchange和queue之间建立好绑定关系。

    客户端投递消息到exchange。
    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

      ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("username");
        factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
        factory.setPassword("pwd");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明一个dirent模式的交换机
        channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
        //声明一个非持久化自动删除的队列
        channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
        //将绑定到改交换机
        channel.queueBind("queue_name","exchange_name","route_key");
        //声明一个消息头部
        Map<String,Object> header=new HashMap<>();
        AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder();
        header.put("charset","utf-8");
        b.headers(header);
        AMQP.BasicProperties bp=b.build();
        //将消息发出去
       channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
  • 消费者
    从队列中取消息
ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  factory.setUsername("username");
  factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
  factory.setPassword("pwd");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  //声明一个dirent模式的交换机
  channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
  //声明一个非持久化自动删除的队列
  channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
  //将绑定到改交换机
  channel.queueBind("queue_name","exchange_name","route_key");
  Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                 byte[] body) throws IOException {
          String message = new String(body, "UTF-8");
          System.out.println(" [x] Received '" + message + "'");
      }
  };
  channel.basicConsume("queue_name", true, consumer);

RabbitMQ消息可靠性


  1. 发送者
    发送这端利用confirm保证消息可以顺利达到rabbitmq,消息开启持久化(Delivery Mode = 2).
    发送者发送一个消息,到达rabbitmq,然后rabbitmq认为此消息需要持久化,经过内存到磁盘的过程,然后把消息返回给发送者端.
    1)队列持久化
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

2)消息持久化

channel.basicPublish("", "task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());
  1. 消息队列(消息持久化)
    申明交换机持久化和队列持久化
    rabbitmq接收到消息,会把消息从内存刷到磁盘的存储文件中.
  2. 消费者
    设置消息的ack,当消费者消费一个消息的时候,会返回给rabbitmq对应queue一个ack消息,这样就保证了消息消费完成.

相关文章

网友评论

    本文标题:RabbitMQ使用

    本文链接:https://www.haomeiwen.com/subject/aofuxftx.html