添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
这里一定要注意版本问题,我安装的是4.7.1 但 rocketmq-spring-boot-starter 默认是 4.4.0,导致我消费一直不成功。
修改配置
server:
port: 8888
rocketmq:
name-server: 10.240.30.102:9876;10.240.30.100:9876
producer:
group: my-group1
sendMessageTimeout: 300000
logging:
level:
root: error
com.giant.cloud: debug
生产者代码
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
@Slf4j
public class RocketApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class, args);
}
@Override
public void run(String... args) throws MQClientException {
String topic = "stringRequestTopic";
// 应该叫 renewTopic 找到已有的topic 替换成一个新的topic
// rocketMQTemplate.getProducer().createTopic("aa",topic,4);
// 设置 VipChannel 为不启用
rocketMQTemplate.getProducer().setVipChannelEnabled(false);
rocketMQTemplate.syncSend(topic, "Hello, World!");
}
}
发送成功后会查到这条消息
消费消息
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @author big uncle
* @date 2021/2/1 11:30
* @module
**/
@Service
@Slf4j
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "consumer-my-group1",enableMsgTrace = false)
public class StringMessage implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.debug("接收消息为:{}",message);
}
}
高级消费模式
更多更灵活的消费方式,样例代码如下
package com.giant.cloud;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@SpringBootApplication
@Slf4j
public class RocketApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class, args);
}
String topic = "stringRequestTopic";
String consumerGroup = "consumer-my-group1";
@Override
public void run(String... args) throws Exception {
product();
consumer();
}
private void consumer() throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 服务地址
consumer.setNamesrvAddr(rocketMQTemplate.getProducer().getNamesrvAddr());
// 订阅Topics
consumer.subscribe(topic, "*");
consumer.setConsumeMessageBatchMaxSize(1000);
consumer.setPullBatchSize(1000);
// consumer.setPullInterval(5000);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeThreadMax(2);
consumer.setConsumeThreadMin(2);
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
log.debug("消费线程为 {} , 获取到的消息量为 {}",Thread.currentThread().getName(),messages.size());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
private long product() throws Exception{
rocketMQTemplate.getProducer().setVipChannelEnabled(false);
rocketMQTemplate.getProducer().setMaxMessageSize(4194304*10);
log.debug("生产者组为:{}",rocketMQTemplate.getProducer().getProducerGroup());
List<Message> messages = new ArrayList<>();
for (int i=0;i<10000;i++) {
String msg = "h"+i;
messages.add(new Message(topic, "", "", msg.getBytes()));
}
long startTime = System.currentTimeMillis();
rocketMQTemplate.getProducer().send(messages);
long time = System.currentTimeMillis() - startTime;
log.debug("发送成功,耗时 {} ms",time);
return time;
}
}
注意
1. 2m 模式下,发现消息只发送在 broker-a,没发送到 broker-b
与我想象的不一样是,我查看源码发现会根据 topic 查到 topic的路由信息,里面包含了 topic 所在 brokerName,只有 broker-a,也就是在集群之前给 broker-a 所创建的 topic,不会因为集群模式,而发送到别的 broker,即使我在 broker-b 使用 mqadmin 能看到 broker-a 的 topic。
只有如下创建的 topic 才能在发送消息的时候,发送给两个 broker,但如果你是批量发送的话,只会发送给一个 broker。
[root@node100 bin]# ./mqadmin updateTopic -n 10.240.30.100:9876 -t stringRequestTopic102 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create topic to 10.240.30.102:10911 success.
create topic to 10.240.30.100:10911 success.
替换成 syncSend 发送
private long product() throws Exception{
rocketMQTemplate.getProducer().setVipChannelEnabled(false);
rocketMQTemplate.getProducer().setMaxMessageSize(4194304*10);
log.debug("生产者组为:{}",rocketMQTemplate.getProducer().getProducerGroup());
List<Message> messages = new ArrayList<>();
for (int i=0;i<50;i++) {
String msg = "h"+i;
rocketMQTemplate.syncSend(topic,msg);
}
long startTime = System.currentTimeMillis();
// rocketMQTemplate.getProducer().send(messages);
long time = System.currentTimeMillis() - startTime;
log.debug("发送成功,耗时 {} ms",time);
return time;
}
发送前
发送后
2. 消费 setConsumeMessageBatchMaxSize最大支持[1,1024],但是调到 1000 最高消费却是 32 个,无法让一个线程消费 1000 个。
我尝试查看源码为什么会只有32个,但看到最后发现是body 大小始终只拿到 5664个字节,decode之后 message只有32个。我尝试把 setConsumeMessageBatchMaxSize 和 setPullBatchSize 改到 10,发现 body 的大小是1770个字节。我尝试改了内容大小,但与内容没有关系。
网友评论