美文网首页
RocketMQ学习

RocketMQ学习

作者: 胖嘟嘟洒酒疯 | 来源:发表于2019-06-05 00:46 被阅读0次

    RocketMQ 学习

    安装部署**

    1. 下载rocketmq解压:下载地址

    2. 启动nameserver,进入bin目录下执行

      # -n 参数指定nameserver的访问地址与端口号,&后台启动
      sh mqnamesrv -n host:port &
      #备注:在启动过程中可能会因为无法分配内存启动失败,打开bin目录下的runserver.sh修改启动参数
      
    3. 启动broker,进入bin目录下执行

      # -n 指定nameserver的访问地址, -c 指定broker的配置文件
      sh mqbroker -n host:port -c ../conf/broker.conf autoCreateTopicEnable=true &
      #备注:修改broker.conf,添加 brokerIP1 = host 否则外网访问可能会报错
      
    4. 开放访问端口号

      开放nameservr访问端口 9876
      开放broker访问端口 10909 10911 10912
      备注:当时因为少开放了一个10912端口导致程序一直报错,错误提示忘了是啥啦
      

    DefaultMQProducer属性配置

    public class DefaultMQProducer extends ClientConfig implements MQProducer {
    
        /**
         * 默认生产者,
         */
        protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    
        /**
         * 生产着组别,同组别的所有生产着实例作用相同
         */
        private String producerGroup;
    
        /**
         * 不指定topic时的默认topic
         */
        private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    
        /**
         * 默认topic队列的大小
         */
        private volatile int defaultTopicQueueNums = 4;
    
        /**
         * 发送消息超时时间
         */
        private int sendMsgTimeout = 3000;
    
        /**
         * 对消息进行压缩的阀值
         */
        private int compressMsgBodyOverHowmuch = 1024 * 4;
    
        /**
         * 同步模式下发送消息失败的重试次数
         */
        private int retryTimesWhenSendFailed = 2;
    
        /**
         * 异步模式下发送消息失败的重试次数
         */
        private int retryTimesWhenSendAsyncFailed = 2;
    
        /**
         * 当消息发送失败时用其他的broker进行重试
         */
        private boolean retryAnotherBrokerWhenNotStoreOK = false;
    
        /**
         * 最大允许的消息大小
         */
        private int maxMessageSize = 1024 * 1024 * 4; // 4M
    }
    

    DefaultMQPushConsumer属性配置

    public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    
        /**
         * 默认消费者
         */
        protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    
        /**
         * 消费者组别
         */
        private String consumerGroup;
    
        /**
         * 消费模式,默认集群消费,rocketmq支持集群消费和广播消费
         */
        private MessageModel messageModel = MessageModel.CLUSTERING;
    
        /**
         * 消费者开始消费的位置,默认从消费者停止之前的最后一个offset处开始消费
         */
        private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    
        /**
         * 回溯消费时间,默认回溯消费半小时前的数据
         */
        private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
    
        /**
         * 消费者消费消息时的消息队列分配策略
         */
        private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    
        /**
         * Subscription relationship
         */
        private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
    
        /**
         * 消息监听器
         */
        private MessageListener messageListener;
    
        /**
         * Offset Storage
         */
        private OffsetStore offsetStore;
    
        /**
         * 最小消费者线程数量
         */
        private int consumeThreadMin = 20;
    
        /**
         * 最大消费者线程数量
         */
        private int consumeThreadMax = 64;
    
        /**
         * 动态调整消费者线程的消息阀值
         */
        private long adjustThreadPoolNumsThreshold = 100000;
    
        /**
         * Concurrently max span offset.it has no effect on sequential consumption
         */
        private int consumeConcurrentlyMaxSpan = 2000;
    
        /**
         * 消息队列流量控制阀值
         */
        private int pullThresholdForQueue = 1000;
    
        private int pullThresholdSizeForQueue = 100;
    
        private int pullThresholdForTopic = -1;
    
        private int pullThresholdSizeForTopic = -1;
    
        /**
         * Message pull Interval
         */
        private long pullInterval = 0;
    
        /**
         * 批量消费的消息数量
         */
        private int consumeMessageBatchMaxSize = 1;
    
        /**
         * 批量拉取的消息数量
         */
        private int pullBatchSize = 32;
    
        private boolean postSubscriptionWhenPull = false;
    
        private boolean unitMode = false;
    
        /**
         * 消费者消费消息失败后的最大重试测试,-1表示重试16次
         */
        private int maxReconsumeTimes = -1;
    
        private long suspendCurrentQueueTimeMillis = 1000;
    
        /**
         * 消息消费的超时时间
         */
        private long consumeTimeout = 15;
    
    

    使用案例

    // TODO: 2019-06-05  
    

    相关文章

      网友评论

          本文标题:RocketMQ学习

          本文链接:https://www.haomeiwen.com/subject/mhbdxctx.html