package com.djh.consumertest.config.consumer;
import com.djh.consumertest.config.consumer.processor.MQConsumeListenerConcurrently;
import com.djh.consumertest.config.consumer.processor.MQConsumeListenerOrderly;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MQConsumeConfig {
@Value("${mq.namesrvAddr}")
String namesrvAddr;
@Autowired
private RPCHook aclRPCHook;
@Autowired
private MQConsumeListenerConcurrently mqConsumeListenerConcurrently;
@Autowired
private MQConsumeListenerOrderly mqConsumeListenerOrderly;
@Bean
public DefaultMQPushConsumer getConsumerTopicB() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupB", aclRPCHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setVipChannelEnabled(false);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING); //集群
//consumer.setMessageModel(MessageModel.BROADCASTING); //广播
try {
//订阅
consumer.subscribe("topicB", "*");
//注册消费的监听方式
consumer.registerMessageListener(mqConsumeListenerConcurrently);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("=======消费者 topicB 启动成功=======");
return consumer;
}
@Bean
public DefaultMQPushConsumer getConsumerTopicOrder() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupC", aclRPCHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setVipChannelEnabled(false);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//consumer.setMessageModel(MessageModel.CLUSTERING); //集群
//consumer.setMessageModel(MessageModel.BROADCASTING); //广播
try {
//订阅
consumer.subscribe("topicOrder", "*");
//注册消费的监听方式
consumer.registerMessageListener(mqConsumeListenerOrderly);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("=======消费者 topicOrder 启动成功=======");
return consumer;
}
@Bean
public DefaultMQPushConsumer getConsumerTopicTran() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTran", aclRPCHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setVipChannelEnabled(false);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING); //集群
//consumer.setMessageModel(MessageModel.BROADCASTING); //广播
try {
//订阅
consumer.subscribe("topicTran", "*");
//注册消费的监听方式
consumer.registerMessageListener(mqConsumeListenerConcurrently);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("=======消费者 topicTran 启动成功=======");
return consumer;
}
@Bean
public DefaultMQPullConsumer defaultMQPullConsumer(){
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupD", aclRPCHook);
consumer.setNamesrvAddr(namesrvAddr);
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
System.out.println("=======消费者 PullConsumer 启动成功=======");
return consumer;
}
}
网友评论