RocketMQ 学习
安装部署**
-
下载rocketmq解压:下载地址
-
启动nameserver,进入bin目录下执行
# -n 参数指定nameserver的访问地址与端口号,&后台启动 sh mqnamesrv -n host:port & #备注:在启动过程中可能会因为无法分配内存启动失败,打开bin目录下的runserver.sh修改启动参数
-
启动broker,进入bin目录下执行
# -n 指定nameserver的访问地址, -c 指定broker的配置文件 sh mqbroker -n host:port -c ../conf/broker.conf autoCreateTopicEnable=true & #备注:修改broker.conf,添加 brokerIP1 = host 否则外网访问可能会报错
-
开放访问端口号
开放nameservr访问端口 9876 开放broker访问端口 10909 10911 10912 备注:当时因为少开放了一个10912端口导致程序一直报错,错误提示忘了是啥啦
DefaultMQProducer属性配置
public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* 默认生产者,
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
/**
* 生产着组别,同组别的所有生产着实例作用相同
*/
private String producerGroup;
/**
* 不指定topic时的默认topic
*/
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
/**
* 默认topic队列的大小
*/
private volatile int defaultTopicQueueNums = 4;
/**
* 发送消息超时时间
*/
private int sendMsgTimeout = 3000;
/**
* 对消息进行压缩的阀值
*/
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
* 同步模式下发送消息失败的重试次数
*/
private int retryTimesWhenSendFailed = 2;
/**
* 异步模式下发送消息失败的重试次数
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* 当消息发送失败时用其他的broker进行重试
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;
/**
* 最大允许的消息大小
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
}
DefaultMQPushConsumer属性配置
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
/**
* 默认消费者
*/
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* 消费者组别
*/
private String consumerGroup;
/**
* 消费模式,默认集群消费,rocketmq支持集群消费和广播消费
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* 消费者开始消费的位置,默认从消费者停止之前的最后一个offset处开始消费
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* 回溯消费时间,默认回溯消费半小时前的数据
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
* 消费者消费消息时的消息队列分配策略
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/**
* Subscription relationship
*/
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
/**
* 消息监听器
*/
private MessageListener messageListener;
/**
* Offset Storage
*/
private OffsetStore offsetStore;
/**
* 最小消费者线程数量
*/
private int consumeThreadMin = 20;
/**
* 最大消费者线程数量
*/
private int consumeThreadMax = 64;
/**
* 动态调整消费者线程的消息阀值
*/
private long adjustThreadPoolNumsThreshold = 100000;
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
/**
* 消息队列流量控制阀值
*/
private int pullThresholdForQueue = 1000;
private int pullThresholdSizeForQueue = 100;
private int pullThresholdForTopic = -1;
private int pullThresholdSizeForTopic = -1;
/**
* Message pull Interval
*/
private long pullInterval = 0;
/**
* 批量消费的消息数量
*/
private int consumeMessageBatchMaxSize = 1;
/**
* 批量拉取的消息数量
*/
private int pullBatchSize = 32;
private boolean postSubscriptionWhenPull = false;
private boolean unitMode = false;
/**
* 消费者消费消息失败后的最大重试测试,-1表示重试16次
*/
private int maxReconsumeTimes = -1;
private long suspendCurrentQueueTimeMillis = 1000;
/**
* 消息消费的超时时间
*/
private long consumeTimeout = 15;
使用案例
// TODO: 2019-06-05
网友评论