美文网首页
消息服务 - RabbitMQ 基础入门

消息服务 - RabbitMQ 基础入门

作者: 我不是李小龙 | 来源:发表于2017-05-18 22:39 被阅读0次

rabbitmq

RabbitMQ官方入门教程

本文算是实现对入门教程的 java版本翻译吧。本文中演示代码地址

1. 安装

  1. 先安装 erlang (安装网上提供的教程安装erlang)

  2. 在安装 rabbitmq-server
    下载rabbitmq的安装包的时候选择 tar.xz 直接解压就可以了

  3. 启动/停止

#启动rabbitmq
#进入安装目录的sbin 目录,执行
./rabbitmq-server -detached

#关闭rabbitmq
./rabbitmqctl stop


2.用户权限

guest/guest 现在只能在localhost使用,不能远程使用

需要添加用户和权限

参考文章 RabbitMQ用户角色及权限控制

例如:

####添加用户:

$sudo rabbitmqctl add_user  user_admin  passwd_admin  

#####修改角色为 administrator:

$sudo rabbitmqctl set_user_tags user_admin administrator

上面的操作有了,可能还是为在日志中提示 user_admin在 vhost '/' 权限不够

则执行下面的操作:

######修改权限
$sudo rabbitmqctl  set_permissions -p /  user_admin '.*' '.*' '.*'

3.rabbitmq 的使用

3.1 工作队列(workqueues) 模式

注意的点:

  1. 工作队列中的消息被所有的消费者共享。
  2. rabbitmq在路由消息到消费者的时候使用轮询(round-robin)的方式,找到第n个消费者来消费消息
  3. ack(ackownledgment)机制确保消息被消费,不出现消息没有消费就从内存中删除的情况

3.1.1 ack机制(Message acknowledgment)

RabbitMQ 支持消息确认,当消费者(Consumer)接受到消息并且处理完成之后,回给RabbitMQ 发送一个确认消息,Rabbitmq这个时候可以随意的删除这个消息

如果消费者die ,但是没有发送确认消息,这个时候RabbitMQ会认为消息没有完全处理,这个时候会这个消息重新放到队列之中,使用轮询的方式分配给其他的消费者消费

RabbitMQ 默认开启了确认机制,使用的时候设置 autoAck = true 来关闭确认机制,设置autoAck = false 则开启

3.1.2 消息持久化(Message durability)

启动消息持久化,需要在申明channel的时候设置持久化属性为true

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

一个确定了是否持久化的队列,不能再修改durable的值

设置队列为持久化队列之后,需要设置下消息的属性,例如设置属性为MessageProperties (实现了 BasicProperties)

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

使用持久化消息并不能完全保证消息的持久化,应为消息可能先保存在缓存,后面保存到硬盘上,不过对于一般的task是完全足够的,如果想要确保完全的持久化,可以结合 publisher confirms 来实现

3.1.3 公平分配(Fair dispatch)

通过设置prefetchCount 的值,控制一个消费者接受的任务数量

int prefetchCount = 1;
channel.basicQos(prefetchCount);

如上面的设置,当一个消费者还有没有处理完的任务的时候,rabbitmq不会分配任务给他

开启管理功能

rabbitmq-plugins enable rabbitmq_management

web管理界面: http://server-name:15672/


3.2 发布订阅(Publish/Subscribe)模式

使用发布订阅模式,一条消息会被发送给多个消费者消费

使用示例说明:

创建一个日志的生产者 EmitLog 来发布日志,使用多个日志接受者ReceiveLogs
创建两个接受者,一个把日志往硬盘上面写,一个把日志打印到显示器。整个过程发布日志
的只有一个。日志被广播到多个接受者

1.1 交换(Exchanges)

在rabbitmq的消息模型中

生产者-->发送消息的一端

队列--> 消息缓冲区

消费者-->消费消息的一端

rabbitmq的核心思想中,生产者通常不直接发送消息给一个队列,事实上,
生产者也不知道把消息发送给哪个队列

实际上,生成这是把消息发送给一个Exchange,这个Exchange一方面从生产者那边接受消息,
一方面推送消息给队列(queue),exchange清楚的知道自己接受的消息要怎么处理——是追加到
一个指定的队列?还是追加到多个队列?还是直接丢弃?具体做法要根据exchange的类型(type)
来定

可以通过下面的指令列出exchange的类型

./rabbitmqctl list_exchanges

exchange的类型:

  1. direct
  2. topic
  3. headers
  4. fanout

之前的例子中没有使用exchange是因为我们使用了默认的exchange 即使用空字符串"" 来定义的
(默认exchange的消息会被发送到指明的routingKey的队列中)

channel.basicPublish("", "hello", null, message.getBytes());

现在创建一个exchange并发布

channel.exchangeDeclare("logs", "fanout");
channel.basicPublish( "logs", "", null, message.getBytes());

1.2 临时队列(Temporary queues)

使用无参的 queueDeclare() 方法可以创建一个非持久化的、独立的、自动删除的、名称随机的一个
队列。例如:产生一个名为 amq.gen-JzTY20BRgKO-HjmUJj0wLg 的队列

String queueName = channel.queueDeclare().getQueue();

1.3 绑定(Bindings)

之前说过,消息分配到哪个队列是有exchange来处理的,那么要确定消息去哪儿,就需要明确队列和exchange
的关系。这个过程称为binding

channel.queueBind(queueName, "logs", "");

3.3 路由(Routing)

之前的发布订阅示例中,我们使用的是广播消息,所有的接受者都能接受。使用routing之后
能够让接收者只接收消息的一个子集。例如之前的示例中,只有错误级别的日志写到硬盘,所有的
的错误全部打印

1.bindings

在绑定的过程中加上 routingKey,为了同 basic_publish 区分,我们称之为 binding key

channel.queueBind(queueName, EXCHANGE_NAME, "black");

2. Direct exchange

fanout exchange :只适合无脑的广播模式

direct exchange :消息会被分配到与消费的routing key 对应的 binding key的队列上,匹配不上的
消息直接丢弃

image

如图:direct 的 exchange X 关联了两个队列 Q1、Q2,Q1的 binding key=orange
,Q2的绑定了两个key black 和 green

在这种情况下:

routing key=orange的消息会被 exchange X 传递给 Q1

routing key=black 或者 routing key=green 的消息会被 exchange X 传递给 Q2

3. 多绑定 (Multiple bindings)

direct-exchange-multiple

如图,对direct exchange 同时使用 binding key =black 绑定Q1、Q2,这个时候
direct的exchange就同之前的fanout的一样了,直接广播消息了

4. 测试

测试时候一个开两个接收者,一个设置级别为 error ,一个设置为 info,error
然后开两个生产者 分别发送 error 级别的消息和info级别的消息

观察接收情况


3.4 主题(Topic)模式

在前面的日志例子中,我们可以通过级别来区分日志,但是我们还想通过日志来源来区分日志
就像unix tool 中的syslog,他能通过 级别(info/warn/error)和 来源(auth/cron/kern)来
路由日志

例如:我们只处理来自“cron” 的 “error”级别的日志,同时打印来自“kern”的所有级别的
日志

要实现这种要,我们就需要使用 topic exchange

1. Topic exchange

topic exchange 的 routing key必须是 一个或者英文单词,中间使用点隔开的方式,
最大长度是255字节

binding key 必须是相同格式,topic exchange 的逻辑和direct 很相似,消息只会分配到匹配的binding key的队列

  1. *(星) 代表一个单词
  2. #(哈希)代表零个或者多个单词

例如:

topic-example

在这个例子中,接收动物消息,消息会被发送到一个拥有三个单词(两个点隔开)的 routing key
格式为 <speed>.<color>.<species>

Q1绑定了 binding key“ *.orange.* ”, Q2 绑定了 “ *.*.rabbit” 和 “lazy.#”

Q1只对颜色为 orange的消息感兴趣

Q2关心所有的rabbit 和 lazy的消息

routing key 为 quick.orange.rabbit既会被发送到Q1 也会被发送到 Q2
lazy.orange.elephant --> Q1,Q2
quick.orange.fox --> Q1
lazy.brown.fox --> Q2
lazy.pink.rabbit --> Q2(有两个binding key 匹配的情况下依然只会被发送一次)
quick.brown.fox --> 匹配不上,直接丢弃
lazy.orange.male.rabbit -->Q2 (匹配上了"lazy.#")

当不使用 * 或者 # 站位符的时候,topic exchange表现的就和direct 是一样的


3.5 RPC

这一节使用 RabbitMQ 建立一个RPC系统:一个客户端和一个可扩展的服务器

本示例中在客户端调用一个call 方法,服务端返回一个Fibonacci 数列的值

关于使用rpc的几点建议:

  1. 本地方法和远程方法要定义明确,一目了然

  2. 系统加注释,是组件之间的依赖清晰可见

  3. 异常处理,当rpc 服务器出现异常的时候,客户端改如何处理

相关文章

网友评论

      本文标题:消息服务 - RabbitMQ 基础入门

      本文链接:https://www.haomeiwen.com/subject/zqsexxtx.html