美文网首页RabbitMQ
RabbitMQ(二)实战

RabbitMQ(二)实战

作者: 右丶羽 | 来源:发表于2017-11-02 11:28 被阅读117次

    准备工作

    RabbitMQ是一个消息传递者,你可以将他看作是一个 邮局,你只需将信息放进"邮筒" 他会帮助你完成信息的传达

    安装

    linux下载页面--- 需要解压
    ubuntu到这--- 自动安装

    配置

    安装完成之后,如果安装方式是解压,请配置环境变量

    RABBITMQ_HOME = #解压目录#

    然后是新建配置文件: 在/etc/rabbitmq/下新建以下两个文件
    rabbitmq-env.conf环境信息配置

    RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
    RABBITMQ_NODE_PORT=5672
    RABBITMQ_NODENAME=node01

    rabbitmq.config 核心配置文件(不要把 . 漏掉了)

    [{rabbit, [{loopback_users, []}]}].

    配置完成之后可以使用以下命令开启RabbitMQ (需要root)

    rabbitmq-server

    如果安装方式是自动安装,那么则需要进行重启,因为安装完成后自动启动了(我是使用暴力方式重启的)
    输入命令 sudo lsof -i :25672 来查找当前运行的 rabbitmq 对应的pid
    然后使用sudo kill-9 #pid#来关闭
    然后就可以使用 rabbitmq-server 来开启服务了

    启动

    第一次启动 可以看到completed with 0 plugins 这样的信息

    RabbitMQ提供了UI管理页面(需要重启)我们可以通过rabbitmq-plugins enable rabbitmq_management 命令来开启

    重启后日志应该如下

    上面可以看到 completed with 6 plugins 说明UI页面已经开启,那么可以访问http://localhost:15672 来访问 rabbitmq的UI页面

    默认使用
    用户名:guset
    密码:guset
    进行登录

    创建用户

    这里使用命令创建:
    rabbitmqctl add_user test test
    rabbitmqctl set_user_tags test administrator
    tag分为四种"management", "policymaker", "monitoring" "administrator" 详见 
    http://www.rabbitmq.com/management.html

    使用 test -- test重新登录

    准备工作到此就结束了,下面开始讲解案例,因为后面会整合Spring Boot所以案例都运行在Spring Boot 包环境下

    Java RabbitMQ Demo

    官方给各种语言的多种应用提供了样例
    本文很多案例中的概念在RabbitMQ(一)都提到过,这里可能会重复讲解,复习一下

    案例一:"Hello World"

    所有的开始都是 "Hello World"
    在这个样例中,我们会使用两个java程序 分别完成

    生产者:完成一条消息的发送
    消费者:接收信息并打印

    P代表生产者,C代表消费者,中间的框是一个队列(queue)——RabbitMQ为消费者保留的消息缓冲区

    RabbitMQ支持多种协议。本教程使用AMQP 0-9-1,这是一种开放的、通用的消息传递协议。

    pom.xml中添加下面代码来引入rabbitMQ的包 (版本号不限制)

    <dependency>            
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.0.2</version>
    </dependency>
    

    还需要引入SL4J包,RabbitMQ对此包有依赖。。。 因为整个项目是基于 Spring Boot包环境下开发,已经引入了SL4J 所以不需要再单独引入

    如果是单独运行的请自行添加

    首先编写一个发送类
    我们创建一个 Send 类来完成发送操作:
    定义(常量)队列名

    main方法中:

    这里我们将会连接到本地的代理上,因此是 localhost , 如果我们想要连接到其他机器上, 可以使用对应的 IP地址
    然后就是创建一个channel,我们大多数操作都会通过channel提供的API来完成

    接着,我们声明一个 队列(queue)以发送消息:

    queueDeclare()声明队列是幂等的,当且仅当其不存在的时候才会创建。不同队列间根据队列名区分

    消息是通过二进制数组的方式传递的(getBytes()

    最后关闭资源:

    完整代码

    接收类(Receiving)

    这是用来发布消息的,我们的消费者(consumer)从RabbitMQ推送消息,不像一般的发布器只发布一条消息,这将一直保持运行以监听和打印多条消息。

    建立连接、通道的过程跟上面的一致,更多的是需要建立一个consumer对象 对接收到的消息进行处理

    运行"消费者"(Recv)

    开启监听类

    此时再运行(Send)

    接收到信息

    同时,当你没有开启接收类的时候,发送类发送的消息不会丢失,而是缓存在队列(queue)里面。当你开启接受类的同时,会自动收到来自队列里面的消息。消息被接收后就会通知RabbitMQ将队列里面的缓存清理,不管消息有没有被处理完

    Here's the whole Recv.java class.

    案例二:Work queues

    接下来是part2

    在这一项中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。

    Work Queues 的主要思想是,避免立即执行资源密集的任务而不得不等待其执行完成。我们将任务封装为消息并将其发送到队列中,在后台运行的一个工作进程将会弹出任务并最终执行该任务。当你管理许多工作节点时,任务就会在他们之间共享。

    这个概念在web应用程序中尤其有用,因为在一个短HTTP请求窗口中不可能处理复杂的任务。

    下面我们发送一个特殊的String , 用 Thread.sleep() 辅助,来模拟一些耗时的工作。
    用 点 来简单表示一个任务的复杂度,例如 Hello.. 表示此任务需要两秒进行处理

    在Send.java的基础上修改来构造一个新的类 NewTask.java:

    NewTask.java 辅助方法

    在Recv.java的基础上修改,来构造一个新的类 Worker.java:

    Worker.java

    首先执行一个简单的样例:
    设置输入参数为:


    然后分别运行NewTask.java 和 Worker.java

    run NewTask run Worker

    (因为当前是单应用运行)在Worker.java中,需要等待一定的时间才能执行完成

    开启多个Worker应用

    即对Worker.java运行多次

    Worker1 Worker2

    默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环(平均分配)。

    消息答复

    完成一项任务可能需要一定的时间。你可能会想,如果一个消费者开始一项长时间的任务,并且只完成了一部分,那么会发生什么。在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它立即将其标记为删除。在这种情况下,如果您强制关闭了一个工作节点,我们将丢失它正在处理的消息。我们还将丢失发送给这个特定工作者的所有消息,但是还没有处理。

    通常我们不希望因为一个节点挂掉而丢失任何消息,而希望能将这些消息传递给其他存活节点进行处理。

    确保消息能够不丢失,RabbitMQ支持 message acknowledgments( 消息答复),一条特定的消息被接收后,返回一个ack告诉RabbitMQ可以随意地进行删除

    如果一个消费者挂了(通道关闭,TCP连接关闭)就不能发送ack给RabbitMQ,此时RabbitMQ就会意识到,某条消息没有被处理完成,那么就会将其重新发送到其他的消费者。这种处理流程保证了信息不会丢失,即便偶尔有消费者挂掉。

    没有任何消息超时,RabbitMQ将在某个消费者挂掉后重新传递消息。即使处理消息需要很长时间。

    缺省情况下,手动消息确认将被打开。在前面的例子中,我们通过autoAck=true标记显式地关闭了它们。此时将这个标志设置为false,并在完成任务后向员工发送适当的确认信息。

    修改 Worker.java

    System.out.println(" [x] Done");//下面添加
    channel.basicAck(envelope.getDeliveryTag(), false);

    boolean autoAck = true; // 改为
    boolean autoAck = false;

    重新开始运行任务
    fourth....执行时, 强制关闭Worker

    fourth....任务会自动传递到其他的Worker(或者重新打开Worker时重新执行)

    使用修改后的代码,即便任何时刻关闭节点都不会出现丢失消息的情况(当然,这不能避免 整个RabbitMQ重启的情况下数据的丢失)

    注意,一旦开启手动确认消息答复,就不能忘记发送回执(ack),否则会导致信息堆积(queue中的消息一直不会被删除)

    为了调试这种错误,您可以使用rabbitmqctl来打印messagesun_unacknowledged文件:
    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

    消息持久化

    我们已经学会了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

    当RabbitMQ退出或崩溃时,它将会忘记队列和消息,除非您告诉它不要这样做。需要有两件事来确保消息不会丢失:我们需要将队列和消息标记为持久的。

    首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了实现这一目的,我们需要将其声明为持久的:

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    

    尽管这个命令本身是正确的,但它在我们当前的设置中是无效的。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义具有不同参数的现有队列,并将返回任何试图执行此操作的程序的错误。但是有一个快速的解决方法——让我们声明一个有不同名称的队列,例如taskqueue:

    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);
    

    这个队列名更改需要重新应用到“生产者”“消费者”

    此时,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在,我们需要通过将MessageProperties(implements BasicProperties)设置为PERSISTENT_TEXT_PLAIN来标记开启消息持久化

    import com.rabbitmq.client.MessageProperties;
    
    channel.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    

    将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是当RabbitMQ接受消息并没有保存它时,仍然有一个很短的时间窗口。另外,RabbitMQ不会为每条消息执行fsync(2)——它可能只是保存到缓存中,而不是真正写到磁盘上。持久性保证并不强大,但对于我们的简单任务队列来说,这已经足够了。

    公平分配

    RabbitMQ在消息进入队列时仅发送一条消息。它不考虑消费者的未确认消息的数量。它只是盲目地将每个m个消息发送给第n个消费者(平均分配)。
    这样可能导致一个消费者一直忙碌(很倒霉地接收到所有费时间的任务),而另一个消费者则一直很空闲(接收到的都是轻任务)
    我们可以通过(consumer中)设置 prefetchCount = 1 来避免这个问题

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    

    即将分配策略设为每次只分配一个任务,下一个任务交给首先完成第一个完成任务的消费者,如此类推

    案例参考地址

    案例三:Publish/Subscribe

    上一个案例中,默认是一个任务只交付给一个消费者进行处理,在本案例中,一个任务会交给多个消费者去处理(即 publish / subscribe 模式)。

    为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将发出日志消息,第二个将接收并打印它们。

    在我们的日志系统中,接收程序的每一个运行副本都会得到消息。这样,我们就可以运行两个接收器,一个将日志引导到磁盘;另一个在屏幕上打印日志。

    Exchanges

    上一个案例中,我们使用Queue完成了消息的发送和接收,现在我们来介绍一下Rabbit 的消息模型

    首先再解释一下几个概念
    生产者(producer):发送消息的用应用
    队列(queue):消息的缓冲存储区
    消费者(consumer) :接收消息的用户应用

    RabbitMQ核心思想是,一条消息不会直接从生产者到队列,通常情况下,生产者甚至不知道消息有没有被传递到任何队列中。

    生产者会首先将消息交付给交换器(exchanges)。交换器做的事情很简单--将从生产者(producer)得到的消息传递到队列中(queue)
    至于怎么传递?是传给任意一个还是传递给多个或者是抛弃....根据策略不同有不同做法

    这里我们需要推翻之前的一些误区
    我们之前使用
    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    来进行消息发布,其中第二个参数的意义其实并不是queueName,而是routing key 这个我们后面会提到
    这里先解惑,因为可能会有人想:之前不是要使用queueName来指定希望发送到的queue?怎么现在又说消息不会直接从生产者发送到队列

    这里是因为,上面 basicPublish函数的第一个参数(exchangeName)是空字符串,所以消息会先传递给默认exchange,其类型是 direct ,策略是将消息传递到 routing keybinding key完全匹配的队列,由于默认 exchange 无法设置binding key,而是默认使用queueName 来作为 binding key进行配对,所以使用 默认exchange 的时候,routing key 相当于你想发送到的queueName

    下图X为交换器(Exchanges)


    RabbitMQ提供的有四种策略(在教程(一)中提到过)这里再复述一下

    direct :消息路由到那些binding key与routing key完全匹配的Queue中。
    topic:它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,不过有 分隔符 ' . '以及允许通配符'*' -> 匹配一个单词 ; '#'->任意多个单词
    headers :不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。(键值对匹配)
    fanout :将消息发送到与该exchanges绑定的所有queue

    使用:
    channel.exchangeDeclare("logs", "fanout"); 第一个参数是exchange名称,第二个参数是exchange类型(本案例中就需要用这种类型来实现Logger)

    使用命令: sudo rabbitmqctl list_exchanges 用于查看当前RabbitMQ的exchanges列表

    exchange列表

    在本教程的前几部分中,我们对exchange一无所知,但仍然能够向队列发送消息。这是因为我们使用的是默认的交换,我们通过空字符串("")来识别。

    我们之前是怎么发布消息的呢?

    channel.basicPublish("", "hello", null, message.getBytes());
    

    第一个参数就是 exchange的名称,空字符会使用默认的无名 exchange
    现在我们可以使用刚刚定义的exchange来发送消息

    channel.basicPublish( "logs", "", null, message.getBytes())
    
    临时队列

    Temporary queues
    每次连接到RabbitMQ都是一个新的队列,名字由server 随机命名;
    当与消费者断开连接时,队列自动被删除

    我们可以使用以下代码来生成一个 临时,独立的,自动删除的队列

    String queueName = channel.queueDeclare().getQueue();
    

    这时,queueName包含一个随机的队列名。例如,它可能看起来像q-jzty20brgko-hjjjj0wlg

    绑定

    Bindings

    我们可以将队列(queue)与交换器(exchange)之间的关系成为binding
    使用以下代码进行queue与exchange之间的绑定

    channel.queueBind(queueName, "logs", "");
    

    此时,logsexchange就会将消息传递到我们的队列中

    查看绑定(binding列表)

    rabbitmqctl list_bindings
    

    (EmitLog.java source)

    (ReceiveLogs.java source)

    案例参考地址

    案例四:Routing

    此案例中,我们会尝试更多的特性,例如,仅仅将 错误信息传递到 Log 文件中(节省空间),同时仍能将所有信息打印到控制台上

    Binding的同时可以设置一个额外的参数 routingKey。为了避免跟basic_publish的参数混淆,这里称其为 binding key

    channel.queueBind(queueName,EXCHANGE_NAME,"black")

    binding key的作用 还依赖于 exchange 的类型,比如 fanout就会忽略 binding key

    binding keyrouting key 完全匹配

    案例五:Topics

    之前提到过 Topics 的策略,这里就不讲解案例了,列几个图来自行领悟

    案例参考地址

    案例六:RPC

    同上

    案例参考地址

    相关文章

      网友评论

        本文标题:RabbitMQ(二)实战

        本文链接:https://www.haomeiwen.com/subject/oylipxtx.html