美文网首页
Rocket-MQ源码深度分析

Rocket-MQ源码深度分析

作者: snail_knight | 来源:发表于2017-06-28 16:59 被阅读0次

    这里没有废话只有源码的解读:
    没整理有些地方没有顺序,看到哪里算哪里:
    关于MessageQueueSelector接口:

    长这样,按照

    http://www.jianshu.com/p/453c6e7ff81c ```
    该文的说法,在具备路由信息的基础上,包装一些算法来选择消息队列,在阅读源码之后,就我所见主要是三种,
    

    1 、哈希取模,
    2 、时间戳随机取模,
    3 、机器随机,

    范围是队列的个数
    
    下面代码是选择队列选择接口
    

    //定义的接口
    public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }

    
    

    //动态实现在executor中
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    int queueIndex = (Integer) arg % mqs.size();
    MessageQueue queue = mqs.get(queueIndex);
    brokerName = queue.getBrokerName();
    queueId = queue.getQueueId();
    return queue;
    }

    //实现类
    public class SelectMessageQueueByHash implements MessageQueueSelector {

    //该方法应该是哈希然后再取模的方法

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }
    
        value = value % mqs.size();
        return mqs.get(value);
    }
    

    }

    import java.util.List;
    import java.util.Set;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;

    //字面意思是机器随机,但查看了源代码,在源码策略中并没有采用该方法
    public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {

    private Set<String> consumeridcs;
    
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }
    
    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }
    
    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
    

    }

    //字面意思是随机法,种子范围应该是时间戳,但是在源码中并没有被采用,源码的单词貌似写错了,天哪,我
    //是不是拉下来的是假源码,还是开源版本就是这么粗糙
    public class SelectMessageQueueByRandoom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt();
        if (value < 0) {
            value = Math.abs(value);
        }
    
        value = value % mqs.size();
        return mqs.get(value);
    }
    

    }

    关于消息重复的一些处理方法:

    1、消费端处理消息的业务逻辑保持幂等性
    2、保证每条消息都是唯一编号且保证消息处理成功与去重表的日志同时出现
    

    所谓幂等性就是要保证,在消费端不管处理多少条重复的消息,消费端的处理结果必须是一样的。
    第二条个人感觉是消费端幂等性实现的一个基础,即在消费端会维护一个能够记录成功处理的消息的ID,如果新到的消息ID已经在日志表中,那么就不在处理这条消息。

    
    
    

    相关文章

      网友评论

          本文标题:Rocket-MQ源码深度分析

          本文链接:https://www.haomeiwen.com/subject/buwucxtx.html