rocketmq的消息过滤在consumer端和broker端都有过滤,更高级一点的还有通过filterServer来进行高级过滤。
broker端的过滤:在拉取消息的时候会先去遍历ConsumeQueue的,然后根据tag的hashCode来进行比对看是否符合要求,这里会出现哈希冲突,但是没关系在consumer端会根据tag的字符串进行去比对了,保证了正确性。这里在consumeQueue里面进行比对有个好处,就是避免了对commitLog的访问,不需要再去访问磁盘里的消息了。
consumer端的过滤:把从服务端获取到的消息根据tag进行字符串比对,这里确保了broker由于哈希冲突导致的过滤不干净的问题。
更高级些的消息过滤是使用FilterServer进行过滤,在rocketMq的架构中有一个filtersrv模块,他是用来进行消息过滤的,他通过consumer端上传上来的过滤代码进行过滤。
采用FilterServer过滤的步骤:
- 在broker的配置中添加filterServerNumbers=3配置,这样在broker启动的时候会启动3个filter server进程用来进行broker端的高级消息过滤。它是在broker端和consumer端之间的一层,把从broker端获取的消息按consumer端上传的过滤逻辑进行过滤,然后才返回给consumer端。
- 实现MessageFilter接口,编写自己的消息过滤逻辑。
- 通过consumer.subscribe方法上传实现了MessageFilter接口的类,用于filter server进行过滤。
这里把本地的类上传到rocketMq远端进行执行,这里是危险的操作,如果本地代码大量创建对象和线程就会很消耗broker机器的资源,必须严格确认自己本地的代码才确认上传到远端broker执行。
网友评论