美文网首页
RocketMQ学习

RocketMQ学习

作者: 胖嘟嘟洒酒疯 | 来源:发表于2019-06-05 00:46 被阅读0次

RocketMQ 学习

安装部署**

  1. 下载rocketmq解压:下载地址

  2. 启动nameserver,进入bin目录下执行

    # -n 参数指定nameserver的访问地址与端口号,&后台启动
    sh mqnamesrv -n host:port &
    #备注:在启动过程中可能会因为无法分配内存启动失败,打开bin目录下的runserver.sh修改启动参数
    
  3. 启动broker,进入bin目录下执行

    # -n 指定nameserver的访问地址, -c 指定broker的配置文件
    sh mqbroker -n host:port -c ../conf/broker.conf autoCreateTopicEnable=true &
    #备注:修改broker.conf,添加 brokerIP1 = host 否则外网访问可能会报错
    
  4. 开放访问端口号

    开放nameservr访问端口 9876
    开放broker访问端口 10909 10911 10912
    备注:当时因为少开放了一个10912端口导致程序一直报错,错误提示忘了是啥啦
    

DefaultMQProducer属性配置

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    /**
     * 默认生产者,
     */
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

    /**
     * 生产着组别,同组别的所有生产着实例作用相同
     */
    private String producerGroup;

    /**
     * 不指定topic时的默认topic
     */
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

    /**
     * 默认topic队列的大小
     */
    private volatile int defaultTopicQueueNums = 4;

    /**
     * 发送消息超时时间
     */
    private int sendMsgTimeout = 3000;

    /**
     * 对消息进行压缩的阀值
     */
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    /**
     * 同步模式下发送消息失败的重试次数
     */
    private int retryTimesWhenSendFailed = 2;

    /**
     * 异步模式下发送消息失败的重试次数
     */
    private int retryTimesWhenSendAsyncFailed = 2;

    /**
     * 当消息发送失败时用其他的broker进行重试
     */
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    /**
     * 最大允许的消息大小
     */
    private int maxMessageSize = 1024 * 1024 * 4; // 4M
}

DefaultMQPushConsumer属性配置

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    /**
     * 默认消费者
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

    /**
     * 消费者组别
     */
    private String consumerGroup;

    /**
     * 消费模式,默认集群消费,rocketmq支持集群消费和广播消费
     */
    private MessageModel messageModel = MessageModel.CLUSTERING;

    /**
     * 消费者开始消费的位置,默认从消费者停止之前的最后一个offset处开始消费
     */
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

    /**
     * 回溯消费时间,默认回溯消费半小时前的数据
     */
    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));

    /**
     * 消费者消费消息时的消息队列分配策略
     */
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

    /**
     * Subscription relationship
     */
    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

    /**
     * 消息监听器
     */
    private MessageListener messageListener;

    /**
     * Offset Storage
     */
    private OffsetStore offsetStore;

    /**
     * 最小消费者线程数量
     */
    private int consumeThreadMin = 20;

    /**
     * 最大消费者线程数量
     */
    private int consumeThreadMax = 64;

    /**
     * 动态调整消费者线程的消息阀值
     */
    private long adjustThreadPoolNumsThreshold = 100000;

    /**
     * Concurrently max span offset.it has no effect on sequential consumption
     */
    private int consumeConcurrentlyMaxSpan = 2000;

    /**
     * 消息队列流量控制阀值
     */
    private int pullThresholdForQueue = 1000;

    private int pullThresholdSizeForQueue = 100;

    private int pullThresholdForTopic = -1;

    private int pullThresholdSizeForTopic = -1;

    /**
     * Message pull Interval
     */
    private long pullInterval = 0;

    /**
     * 批量消费的消息数量
     */
    private int consumeMessageBatchMaxSize = 1;

    /**
     * 批量拉取的消息数量
     */
    private int pullBatchSize = 32;

    private boolean postSubscriptionWhenPull = false;

    private boolean unitMode = false;

    /**
     * 消费者消费消息失败后的最大重试测试,-1表示重试16次
     */
    private int maxReconsumeTimes = -1;

    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * 消息消费的超时时间
     */
    private long consumeTimeout = 15;

使用案例

// TODO: 2019-06-05  

相关文章

  • RocketMq 实际案例--普通消息的发送

    RocketMq 实际案例--普通消息的发送 @(消息中间件)[RocketMq 实例] 学习rocketMq 最...

  • 2020-07-15 搭建 RocketMQ 服务器最便捷的方法

    最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这...

  • 史上最便捷搭建RocketMQ服务器的方法

    最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这...

  • RockectMQ教程(一)

    RocketMQ学习 RocketMQ原理解析 RocketMQ单机支持1万以上的持久化队列,前提是足够的内存、硬...

  • RocketMQ学习

    RocketMQ 学习 安装部署** 下载rocketmq解压:下载地址 启动nameserver,进入bin目录...

  • RocketMQ学习

    RocketMQ深度解析RocketMQ之一:RocketMQ整体介绍RocketMQ之二:分布式开放消息系统Ro...

  • RocketMQ学习

    简介 官方简介:RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:能够保证严格的消息顺序提供丰富的...

  • RocketMQ学习

    RocketMQ架构图 NameServer NameServer是相当于zookeeper,用来管理Broker...

  • 使用RocketMQ消费消息

    RocketMQ消费端 今天要来跟大家学习怎样使用RocketMQ来进行消息的消费 先简单创建个Maven项目使用...

  • RocketMQ-基础使用(一)

    零、本文纲要 一、RocketMQ基础 MQ特点 RocketMQ安装 测试RocketMQ 二、RocketMQ...

网友评论

      本文标题:RocketMQ学习

      本文链接:https://www.haomeiwen.com/subject/mhbdxctx.html