RabbitMQ:是通过amqp 协议实现的:
AMQP协议可划分为三层:

这种分层架构类似于OSI网络协议,可替换各层实现而不影响与其它层的交互。AMQP定义了合适的服务器端域模型,用于规范服务器的行为(AMQP服务器端可称为broker)。在这里Model层决定这些基本域模型所产生的行为,这种行为在AMQP中用”command”表示,在后文中会着重来分析这些域模型。Session层定义客户端与broker之间的通信(通信双方都是一个peer,可互称做partner),为command的可靠传输提供保障。Transport层专注于数据传送,并与Session保持交互,接受上层的数据,组装成二进制流,传送到receiver后再解析数据,交付给Session层。Session层需要Transport层完成网络异常情况的汇报,顺序传送command等工作。
上面是对AMQP协议的大致说明。下面会以我们对消息服务的需求来理解AMQP所提供的域模型。消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。在AMQP中提供类似功能的两种域模型:Exchange 和 Message queue。

Exchange接收消息生产者(Message Producer)发送的消息根据不同的路由算法将消息发送往Message queue。Message queue会在消息不能被正常消费时缓存这些消息,具体的缓存策略由实现者决定,当message queue与消息消费者(Message consumer)之间的连接通畅时,Message queue有将消息转发到consumer的责任。
Message是当前模型中所操纵的基本单位,它由Producer产生,经过Broker被Consumer所消费。它的基本结构有两部分: Header和Body。Header是由Producer添加上的各种属性的集合,这些属性有控制Message是否可被缓存,接收的queue是哪个,优先级是多少等。Body是真正需要传送的数据,它是对Broker不可见的二进制数据流,在传输过程中不应该受到影响。
一个broker中会存在多个Message queue,Exchange怎样知道它要把消息发送到哪个Message queue中去呢? 这就是上图中所展示Binding的作用。Message queue的创建是由client application控制的,在创建Message queue后需要确定它来接收并保存哪个Exchange路由的结果。Binding是用来关联Exchange与Message queue的域模型。Client application控制Exchange与某个特定Message queue关联,并将这个queue接受哪种消息的条件绑定到Exchange,这个条件也叫Binding key或是 Criteria。
rabbitmq的使用RabbitTempalte 发送消息,监听通过2 个注解来实现:@RabbitListener 标记一个类 @RabbitHandler 标记一个方法。
消息的发送和接收安全性保障:
发送:
1 消息到达交换机
在配置文件里面,开启消息的确认机制
confirm-publisher = true
给RabbitTemplate 设置一个ConfirmCallBack 实现类,实现后,若消息到达交换机,则会触发里面的方法
2 消息没有到达队列
return-publisher = true
给RabbitTemplate 设置一个ReturnCallBack 实现类,实现后,若消息没有到达队列,则会触发里面的方法
接收:
在配置文件里面设置消息的确认类型为手动确认 listener.ack = manual
在@RabbitListener 标记额类里面,找到使用@RabbitHandler 标记的方法 ,在该方法里面添加
2 个参数:Message message ,Channel channel
在channel 手动确认该消息 channel.ack(消息的标识)
channel 拒绝消息
channel.recover(); 给该消息者,再次补发消息,该消息只会发给当前的消费者
channel.reject();将该消息放回队列,所有的消费者都可以抢
消息的去重:做业务逻辑,在写数据库时,先判断该数据库里面是否有高消息的标识,若有,放弃操作,没有,继续操作
去重:大数据行业的去重:boolmFilter 对字符串做特征的分解,将该特征存储在位集合里面
一般特征函数是hash函数,数量有很多,所以hash 碰撞的几率很小。
原文链接:https://blog.csdn.net/qq_41532872/article/details/94591137
网友评论