美文网首页
第二章3:入门

第二章3:入门

作者: asfafjwefhfuer | 来源:发表于2019-03-10 20:35 被阅读0次

消息生产和消费

  1. ConnectionFactory : 获取连接工厂
  2. Connection :通过连接工厂获取一个连接
  3. Channel : 通过连接创建 数据通信信道,可发送和接收消息
  4. Queue: 具体的消息存储队列
  5. Producer 和 Consumer 生产者和消费者
image

生产端



/**
 * 生产者
 */
public class Producer {


    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3、通过connection创建一个Channel  :网络信道,几乎所有操作都在Channel中进行,是进行消息读写的通道。客户端可建立多个Channel,每个代表一个会话任务。
        Channel channel = connection.createChannel();
        //4、通过channel发送数据
        String message = "hello rabbitMq!";
        //发送5条
        for (int i = 5; i > 0; i--) {
            //exchange  交换机 |routingKey 路由key |props 配置文件
            //不指定Exchange时,交换机默认是AMQP default,此时就看RoutingKey,RoutingKey要等于队列名才能被路由,否则消息会被删除。
            channel.basicPublish("", "test001", null, message.getBytes());
        }

        //5、关闭连接
        channel.close();
        connection.close();

    }
}

*消费者


/**
 * 消费者
 *
 */
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1、创建一个ConnectionFactory并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPort(5672);
        connectionFactory.setHost("");
        connectionFactory.setVirtualHost("/");

        //2、 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过connection创建一个Channel  :网络信道,几乎所有操作都在Channel中进行,是进行消息读写的通道。客户端可建立多个Channel,每个代表一个会话任务。
        Channel channel = connection.createChannel();

        //4、创建一个队列 (声明)
         /* 
        queueName: 队列名称
        durable:  持久化。true 即使服务重启也不会删除这个队列
        exclusive: 独占、 true 队列只能使用一个连接。连接断开,队列删除
        autoDelete: 自动删除: true 脱离了Exchange(连接断开) 即队列没有Exchange关联时。自动删除
        arguments : 扩展参数
        */
        String queueName = "test001";
        channel.queueDeclare(queueName, true, false, false, null);

        //5、创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6、设置channel
        channel.basicConsume(queueName, true, queueingConsumer);

        //7、获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
            Envelope envelope = delivery.getEnvelope();
//            long deliveryTag = envelope.getDeliveryTag();
        }
    }

}


Exchange 交换机

exchange :接收消息,并根据路由键转发消息所绑定的队列

image.png

交换机的属性

  • name: 交换机名称
  • type: 交换机类型。 direct,topic、fanout、headers
  • durability: 是否需要持久化,true为持久化
  • autoDelete: 当最后一个绑定到Exchange上的队列删除后。自动删除该Exchange
  • internal: 当前Exchange是否用于RabbitMQ内部使用。 默认false
  • arguments: 扩展参数。 用于扩展AMQP协议。自己定制使用

direct : 直接

direct Exchange

所有发送到Direct Exchange 的消息被转发到RouteKey中指定的Queue

注意 : Direct模式可以使用RabbitMQ自带的Exchange :default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收、否则该消息会被抛弃。

image.png

Direct Exchange 生产者


/**
 * 直接 交换机
 * direct Exchange模式生产者
 *
 */
public class Producer4DirectExchange {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
        //2 创建连接
        Connection connection = connectionFactory.newConnection();
        //3 创建通道 channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test-direct-exchange";
        String routingKey = "test.direct";
        //5 发送
        String msg = "hello word RabbitMQ For  Direct Exchange Message .... ";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        channel.close();
        connection.close();
    }
}

Direct Exchange 消费者


/**
 * 直接 交换机
 * direct Exchange  消费者
 *
 * @author yangHX
 * createTime  2019/3/10 22:02
 */
public class Consumer4DirectExchange {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
        //支持重连
        connectionFactory.setAutomaticRecoveryEnabled(true);
        //3秒
        connectionFactory.setNetworkRecoveryInterval(3000);

        //2 创建连接
        Connection connection = connectionFactory.newConnection();
        //3 创建通信信道
        Channel channel = connection.createChannel();

        //4 声明
        String exchangeName = "test-direct-exchange";
        String exchangeType = "direct";
        String queueName = "test-direct-queue";
        String routingKey = "test.direct";

        //声明exchange(交换机)   declare (宣布。声明)
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系。
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化。
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //参数。队列名称 是否自动ACK Consumer
        //ACK 自动签收
        channel.basicConsume(queueName, true, queueingConsumer);
        //循环获取消息
        while (true) {
            //获取消息。如果没有消息。这一步将会阻塞
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String s = new String(delivery.getBody());
            System.out.println("收到消息:" + s);

        }


    }
}



Topic Exchange

所有转发到Topic Exchange的消息被转发到所有关心RokeKey中指定Topic的Queue

Exchange 将 RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic


image.png
image.png

Topic Exchange 生产者


/**
 * topic exchange 生产者
 *
 * @author yangHX
 * createTime  2019/3/10 23:13
 */
public class Producer4TopicExchange {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
        //2 创建连接
        Connection connection = connectionFactory.newConnection();
        //3 创建通道 channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送
        String msg = "hello word RabbitMQ For  Topic Exchange Message .... ";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());

        channel.close();
        connection.close();

    }
}



Topic Exchange 消费者


/**
 * topic exchange 消费者
 *
 * @author yangHX
 * createTime  2019/3/10 23:13
 */
public class Consumer4TopicExchange {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey="user.*";
//        String routingKey = "user.#";
        //声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //声明队列
        //durable 是否持久化
        channel.queueDeclare(queueName, false, false, false, null);
        //建立交换机和队列的绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //参数。队列名称。 是否自动ACK consumer
        channel.basicConsume(queueName, true, queueingConsumer);

        //循环获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String s = new String(delivery.getBody());
            System.out.println("收到 topic 消息:  " + s);
        }


    }
}

Fanout Exchange

  • 不处理路由键。只需要简单的将队列绑定到交换机上
  • f发送到交换机的消息都会被转发到于该交换机绑定的所有队列上。
  • Fanout 交换机转发消息是最快的
image.png

Fanout Exchange 生产者


/**
 * fanout 交换机 生产者
 *
 * @author yangHX
 * createTime  2019/3/10 23:45
 */
public class Producer4FanoutExchange {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
        //2 创建连接
        Connection connection = connectionFactory.newConnection();
        //3 创建通道 channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        String msg = "hello word RabbitMQ For  Fanout Exchange Message .... ";
        for (int i = 10; i > 0; i--) {
            channel.basicPublish(exchangeName, "", null, msg.getBytes());
        }

        channel.close();
        connection.close();

    }
}


Fanout Exchange 消费者


/**
 * Fanout Exchange 消费者
 *
 * @author yangHX
 * createTime  2019/3/10 23:46
 */
public class Consumer4FanoutExchange {


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";
        //声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //声明队列
        //durable 是否持久化
        channel.queueDeclare(queueName, false, false, false, null);
        //建立交换机和队列的绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //参数。队列名称。 是否自动ACK consumer
        channel.basicConsume(queueName, true, queueingConsumer);

        //循环获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String s = new String(delivery.getBody());
            System.out.println("收到 fanout 消息:  " + s);
        }


    }


}


Binding-绑定

ExchangeExchangeQueue之间的连接关系
Binding中可以包含RoutingKey或者参数

*Queue-消息队列

消息队列,实际存储消息数据
Durability: 是否持久化,Durablity :是 Transient: 否
Auto delete 如选yes,代表当最后一个监听被移除后,该Queue会自动被删除

*Message-消息

服务器和应用程序之间传送的数据
本质上就是一段数据,由Properties , Payload(Body) 组成
常用属性: delivery mode。 headers(自定义属性)

Message-其他属性

content_type 、content_encoding 、priprity
correlation_id、 reply_to、expriation、 message_id
timestamp、type、user_id、app_id、cluster_id

设置消息属性

image.png

读取消息属性

image.png

Virtual Host -虚拟主机

虚拟地址,用于逻辑隔离,最上层的消息路由
一个Virtual Host 里面可以有若干个Exchange 和Queue
同一个Virtual Host里面不能有相同名称的Exchange或Queue

总结

image.png

相关文章

  • 第二章3:入门

    消息生产和消费 ConnectionFactory : 获取连接工厂 Connection :通过连接工厂获取一个...

  • 《算法竞赛入门经典》第二章习题

    《算法竞赛入门经典》(第二版)第二章习题 水仙花数(daffodil) 输出100~999中的所有水仙花数。若3位...

  • 入门级知识

    入门知识_1 入门知识_2 入门知识_3 入门知识4

  • 无标题文章

    第二章:推荐系统入门 原文:http://guidetodatamining.com/chapter-2/ 内容:...

  • 20171031

    【python编程:从入门到实践】第二章 be interest ,so mental can memory pl...

  • 思维训练书单

    1.清醒思考的艺术—德【3星入门】 2.笨蛋!重要的是逻辑!【3星入门】 3.理性动物【3星入门】? 4.隐性动机...

  • 入门

    这本书的入门简直令人发指,我翻到第二章的时候感觉比入门简单多了。但是这样也确实有一些好处,把入门的那些内容和例题真...

  • AI教程:童年の回忆—可爱的神奇宝贝mbe图标绘制

    1、AI软件入门; 2、图标设计入门; 3、UI设计入门。

  • 2018-07-23 Chapter 2.1 Ablout St

    《Python编程从入门到实践》第二章 字符串print("Hello Python world!") msg1 ...

  • 《岳响河》目录 第二章

    第二章1 第二章2 第二章3 第二章4 第二章5 第二章6 第二章7 第二章8 第二章9 第二章10 第二章11 ...

网友评论

      本文标题:第二章3:入门

      本文链接:https://www.haomeiwen.com/subject/dpkppqtx.html