美文网首页
RocketMQ实战

RocketMQ实战

作者: JayMeWangGL | 来源:发表于2021-06-16 14:17 被阅读0次

PS:我这里使用的是自定义的RoketMQ进行消息的发送和消费的,原理都差不多,万变不离其宗。


创建配置文件类

  • 首先创建RocketMqConfig、RocketMqProducerConfig、RocketMqConsumerConfig类,里面包含RocketMQ所需的所有配置,等到创建Consumer和Producer的时候可以一键配置,文中所有代码块均省略getter\setter方法

    public class RocketMqConfig {
      private String namesrvAddr = "127.0.0.1:9876";
      private RocketMqProducerConfig producerConfig = new RocketMqProducerConfig();
      private RocketMqConsumerConfig consumerConfig = new RocketMqConsumerConfig();
    }
    
    public class RocketMqProducerConfig {
      private String groupName = "producer";
      private String instanceName = "producer_instance";
      private String topic = "topic";
    }
    
    public class RocketMqConsumerConfig {
      //组名
      private String groupName = "consumer";
      //实例名
      private String instanceName = "consumer_instance";
      // 订阅主题和标签Map
      private Map<String, String> subscriptions = new HashMap<>();
      //设置批量消费,以提升消费吞吐量,默认是1
      private int consumeMessageBatchMaxSize = 1;
    }
    

创建Consumer和Producer

  • 创建RocketMqProducer、RocketMqConsumer类,里面包括构造方法(有参、无参)、start、stop等方法

    public class RocketMqProducer implements MqProducer {
    
      private DefaultMQProducer producer;
      private RocketMqConfig config;
      private boolean isStarted = false;
    
      public RocketMqProducer(RocketMqConfig config) {
          this.config = config;
          producer = new DefaultMQProducer(config.getProducerConfig().getGroupName());
          producer.setInstanceName(config.getProducerConfig().getInstanceName());
          producer.setVipChannelEnabled(false);
          producer.setNamesrvAddr(config.getNamesrvAddr());
      }
        public boolean isStarted() {
          return isStarted;
      }
        public void start() {
          producer.start();
          isStarted = true;
      }
        public void stop() {
          producer.shutdown();
          isStarted = false;
      }
        public MqSendResult send(String tag, AbstractMessage t) {
            t.setTopic(config.getProducerConfig().getTopic());
            t.setTag(tag);
            Message msg = new Message(t.getTopic(), 
                                      t.getTag(), 
                                      t.getKey(),
                                      t.getMessageType().getType(),
                                      t.getBody(),
                                      true);
            SendResult sendResult = producer.send(msg);
            t.setMessageId(sendResult.getMsgId());
    
            // 返回结果
            MqSendResult mqSendResult = new MqSendResult();
            mqSendResult.setSuccess(true);
            mqSendResult.setCode(sendResult.getSendStatus().toString());
            mqSendResult.setMsgId(sendResult.getMsgId());
            return mqSendResult;
        }
    }
    
    public class RocketMqConsumer implements MqConsumer {
      private MqMessageHandler handler = null;
      private DefaultMQPushConsumer consumer;
      private RocketMqConfig config;
      private boolean isStarted = false;
      public RocketMqConsumer(RocketMqConfig config) {
          this.config = config;
          consumer = new DefaultMQPushConsumer(config.getConsumerConfig().getGroupName());
          consumer.setInstanceName(config.getConsumerConfig().getInstanceName());
          consumer.setVipChannelEnabled(false);
          consumer.setConsumeMessageBatchMaxSize(config.getConsumerConfig().getConsumeMessageBatchMaxSize());
          consumer.setNamesrvAddr(config.getNamesrvAddr());
          // 从队列头开始消费
          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
      }
    
      @Override
      public boolean isStarted() {
          return isStarted;
      }
    
      @Override
      public void start() {
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext Context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getbody());
                    }
                    if (handler != null) {
                        handler.handle(t);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
    
          try {
              for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
                  consumer.subscribe(sub.getKey(), sub.getValue());
              }
              consumer.start();
              isStarted = true;
          } catch (Exception ex) {
              isStarted = false;
              throw new Exception(MqLibConstants.LIB_MQ_ROCKETMQ, MqLibExceptionEnum.MQ_CLIENT_EXCEPTION, ex);
          }
      }
    
      @Override
      public void stop() {
          for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
              consumer.unsubscribe(sub.getKey());
          }
          consumer.shutdown();
          isStarted = false;
      }
    
      @Override
      public void setHandler(MqMessageHandler handler){
          this.handler = handler;
      }
    }
    

消息发送与消费

  • 创建Producer类,在其中创建RocketMqConfig、RocketMqProducer对象;

    public class Producer {
        public static void main(String[] args) {
            RocketMqConfig config = new RocketMqConfig();
            MqProducerClient producer = new MqProducerClient(config);
            producer.start();//启动生产者
            JsonMessage jsonMessage = new JsonMessage();//自定义消息类
            jsonMessage.setData("JsonMessage!");
            producer.send("json", jsonMessage);
        }
    }
    
  • 创建Consumer类,在其中创建RocketMqConfig、RocketMqConsumer对象;

    public class Consumer {
        public static void main(String[] args) {
            RocketMqConfig config = new RocketMqConfig();
            Map<String,String> map = new HashMap();
            map.put(config.getProducerConfig().getTopic(),"json");
            config.getConsumerConfig().setSubscriptions(map);
            MqConsumerClient consumer = new MqConsumerClient(config);
            consumer.start();
        }
    }
    

启动两个类,同时别忘了启动RocketMQ的服务。可以在RocketMQ可视化界面看到生产者、消费者以及消息等信息

相关文章

网友评论

      本文标题:RocketMQ实战

      本文链接:https://www.haomeiwen.com/subject/tdoxyltx.html