美文网首页
rocket mq consumer MQConsumeConf

rocket mq consumer MQConsumeConf

作者: DUJUNHUI | 来源:发表于2020-07-09 10:46 被阅读0次
package com.djh.consumertest.config.consumer;


import com.djh.consumertest.config.consumer.processor.MQConsumeListenerConcurrently;
import com.djh.consumertest.config.consumer.processor.MQConsumeListenerOrderly;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class MQConsumeConfig {

    @Value("${mq.namesrvAddr}")
    String namesrvAddr;
    @Autowired
    private RPCHook aclRPCHook;
    @Autowired
    private MQConsumeListenerConcurrently mqConsumeListenerConcurrently;
    @Autowired
    private MQConsumeListenerOrderly mqConsumeListenerOrderly;

    @Bean
    public DefaultMQPushConsumer getConsumerTopicB() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupB", aclRPCHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setVipChannelEnabled(false);
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING); //集群
        //consumer.setMessageModel(MessageModel.BROADCASTING); //广播

        try {
            //订阅
            consumer.subscribe("topicB", "*");
            //注册消费的监听方式
            consumer.registerMessageListener(mqConsumeListenerConcurrently);
            consumer.start();

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("=======消费者 topicB 启动成功=======");
        return consumer;
    }

    @Bean
    public DefaultMQPushConsumer getConsumerTopicOrder() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupC", aclRPCHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setVipChannelEnabled(false);
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //consumer.setMessageModel(MessageModel.CLUSTERING); //集群
        //consumer.setMessageModel(MessageModel.BROADCASTING); //广播

        try {
            //订阅
            consumer.subscribe("topicOrder", "*");
            //注册消费的监听方式
            consumer.registerMessageListener(mqConsumeListenerOrderly);
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("=======消费者 topicOrder 启动成功=======");
        return consumer;
    }

    @Bean
    public DefaultMQPushConsumer getConsumerTopicTran() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTran", aclRPCHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setVipChannelEnabled(false);
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING); //集群
        //consumer.setMessageModel(MessageModel.BROADCASTING); //广播
        try {
            //订阅
            consumer.subscribe("topicTran", "*");
            //注册消费的监听方式
            consumer.registerMessageListener(mqConsumeListenerConcurrently);
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("=======消费者 topicTran 启动成功=======");
        return consumer;
    }

    @Bean
    public DefaultMQPullConsumer defaultMQPullConsumer(){
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupD", aclRPCHook);
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("=======消费者 PullConsumer 启动成功=======");
        return consumer;
    }
}

相关文章

网友评论

      本文标题:rocket mq consumer MQConsumeConf

      本文链接:https://www.haomeiwen.com/subject/mhsjcktx.html