美文网首页
RocketMQ实战

RocketMQ实战

作者: JayMeWangGL | 来源:发表于2021-06-16 14:17 被阅读0次

    PS:我这里使用的是自定义的RoketMQ进行消息的发送和消费的,原理都差不多,万变不离其宗。


    创建配置文件类

    • 首先创建RocketMqConfig、RocketMqProducerConfig、RocketMqConsumerConfig类,里面包含RocketMQ所需的所有配置,等到创建Consumer和Producer的时候可以一键配置,文中所有代码块均省略getter\setter方法

      public class RocketMqConfig {
        private String namesrvAddr = "127.0.0.1:9876";
        private RocketMqProducerConfig producerConfig = new RocketMqProducerConfig();
        private RocketMqConsumerConfig consumerConfig = new RocketMqConsumerConfig();
      }
      
      public class RocketMqProducerConfig {
        private String groupName = "producer";
        private String instanceName = "producer_instance";
        private String topic = "topic";
      }
      
      public class RocketMqConsumerConfig {
        //组名
        private String groupName = "consumer";
        //实例名
        private String instanceName = "consumer_instance";
        // 订阅主题和标签Map
        private Map<String, String> subscriptions = new HashMap<>();
        //设置批量消费,以提升消费吞吐量,默认是1
        private int consumeMessageBatchMaxSize = 1;
      }
      

    创建Consumer和Producer

    • 创建RocketMqProducer、RocketMqConsumer类,里面包括构造方法(有参、无参)、start、stop等方法

      public class RocketMqProducer implements MqProducer {
      
        private DefaultMQProducer producer;
        private RocketMqConfig config;
        private boolean isStarted = false;
      
        public RocketMqProducer(RocketMqConfig config) {
            this.config = config;
            producer = new DefaultMQProducer(config.getProducerConfig().getGroupName());
            producer.setInstanceName(config.getProducerConfig().getInstanceName());
            producer.setVipChannelEnabled(false);
            producer.setNamesrvAddr(config.getNamesrvAddr());
        }
          public boolean isStarted() {
            return isStarted;
        }
          public void start() {
            producer.start();
            isStarted = true;
        }
          public void stop() {
            producer.shutdown();
            isStarted = false;
        }
          public MqSendResult send(String tag, AbstractMessage t) {
              t.setTopic(config.getProducerConfig().getTopic());
              t.setTag(tag);
              Message msg = new Message(t.getTopic(), 
                                        t.getTag(), 
                                        t.getKey(),
                                        t.getMessageType().getType(),
                                        t.getBody(),
                                        true);
              SendResult sendResult = producer.send(msg);
              t.setMessageId(sendResult.getMsgId());
      
              // 返回结果
              MqSendResult mqSendResult = new MqSendResult();
              mqSendResult.setSuccess(true);
              mqSendResult.setCode(sendResult.getSendStatus().toString());
              mqSendResult.setMsgId(sendResult.getMsgId());
              return mqSendResult;
          }
      }
      
      public class RocketMqConsumer implements MqConsumer {
        private MqMessageHandler handler = null;
        private DefaultMQPushConsumer consumer;
        private RocketMqConfig config;
        private boolean isStarted = false;
        public RocketMqConsumer(RocketMqConfig config) {
            this.config = config;
            consumer = new DefaultMQPushConsumer(config.getConsumerConfig().getGroupName());
            consumer.setInstanceName(config.getConsumerConfig().getInstanceName());
            consumer.setVipChannelEnabled(false);
            consumer.setConsumeMessageBatchMaxSize(config.getConsumerConfig().getConsumeMessageBatchMaxSize());
            consumer.setNamesrvAddr(config.getNamesrvAddr());
            // 从队列头开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        }
      
        @Override
        public boolean isStarted() {
            return isStarted;
        }
      
        @Override
        public void start() {
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext Context) {
                      for (MessageExt msg : msgs) {
                          System.out.println(msg.getbody());
                      }
                      if (handler != null) {
                          handler.handle(t);
                      }
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
      
            try {
                for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
                    consumer.subscribe(sub.getKey(), sub.getValue());
                }
                consumer.start();
                isStarted = true;
            } catch (Exception ex) {
                isStarted = false;
                throw new Exception(MqLibConstants.LIB_MQ_ROCKETMQ, MqLibExceptionEnum.MQ_CLIENT_EXCEPTION, ex);
            }
        }
      
        @Override
        public void stop() {
            for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
                consumer.unsubscribe(sub.getKey());
            }
            consumer.shutdown();
            isStarted = false;
        }
      
        @Override
        public void setHandler(MqMessageHandler handler){
            this.handler = handler;
        }
      }
      

    消息发送与消费

    • 创建Producer类,在其中创建RocketMqConfig、RocketMqProducer对象;

      public class Producer {
          public static void main(String[] args) {
              RocketMqConfig config = new RocketMqConfig();
              MqProducerClient producer = new MqProducerClient(config);
              producer.start();//启动生产者
              JsonMessage jsonMessage = new JsonMessage();//自定义消息类
              jsonMessage.setData("JsonMessage!");
              producer.send("json", jsonMessage);
          }
      }
      
    • 创建Consumer类,在其中创建RocketMqConfig、RocketMqConsumer对象;

      public class Consumer {
          public static void main(String[] args) {
              RocketMqConfig config = new RocketMqConfig();
              Map<String,String> map = new HashMap();
              map.put(config.getProducerConfig().getTopic(),"json");
              config.getConsumerConfig().setSubscriptions(map);
              MqConsumerClient consumer = new MqConsumerClient(config);
              consumer.start();
          }
      }
      

    启动两个类,同时别忘了启动RocketMQ的服务。可以在RocketMQ可视化界面看到生产者、消费者以及消息等信息

    相关文章

      网友评论

          本文标题:RocketMQ实战

          本文链接:https://www.haomeiwen.com/subject/tdoxyltx.html