美文网首页spring boot
Springboot集成Ali RocketMq

Springboot集成Ali RocketMq

作者: 作草分茶 | 来源:发表于2018-08-10 09:21 被阅读330次

1.引入阿里RocketMq依赖

<!-- https://mvnrepository.com/artifact/com.aliyun.openservices/ons-client -->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>${ons-client.version}</version>
</dependency>

2.创建监听tag

@Getter
public enum PushWayEnum {
    WEIXIN(0, "weixin"),
    SMS(1, "sms");

    int code;
    String tag;

    PushWayEnum(int code, String tag) {
        this.code = code;
        this.tag = tag;
    }

    public static String allTag() {
        StringBuilder builder = new StringBuilder();
        PushWayEnum[] values = PushWayEnum.values();
        for (int i = 0; i < values.length; i++) {
            PushWayEnum pushWayEnum = values[i];
            if (i != 0) {
                builder.append("||");
            }
            builder.append(pushWayEnum.tag);
        }
        return builder.toString();
    }
}

3.创建配置类

@Configuration
@Component
@Slf4j
public class RocketMqConfig {

    private final RocketMQConsumer consumer;

    @Autowired
    public RocketMqConfig(RocketMQConsumer consumer) {
        this.consumer = consumer;
    }

    private Properties properties() {
        Properties properties = new Properties();
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "你的AccessKey");
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "你的SecretKey");
        // 设置 TCP 接入域名(此处以公共云生产环境为例)
        properties.put(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
        return properties;

    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean producer) {
        ProducerBean producerBean = new ProducerBean();
        Properties properties = properties(systemPropertyHelper);
        properties.put(PropertyKeyConst.ProducerId,"你的生产者id");
        producerBean.setProperties(properties);
        return producerBean;
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean smsConsumerBean(SystemPropertyHelper propertyHelper) {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = properties(propertyHelper);
        properties.put(PropertyKeyConst.ConsumerId, "你的消费者id");
        consumerBean.setProperties(properties);
        //绑定监听的topic
        Subscription subscription = new Subscription();
        subscription.setTopic("你的topic");
        //绑定要监听的tag,多个tag用 || 隔开
        subscription.setExpression(PushWayEnum.allTag());
        Map<Subscription, MessageListener> map = new HashMap<>(2);
        map.put(subscription, consumer);
        consumerBean.setSubscriptionTable(map);
        return consumerBean;
    }
}

4.创建消息生产者

@Component
@Slf4j
public class RocketMQProducer {

    private final ProducerBean producer;

    @Autowired
    public RocketMQProducer(ProducerBean producer) {
        this.producer = producer;
    }

    /**
     * 发送消息
     *
     * @param tag   你要发送消息的tag
     * @param key
     * @param body
     */
    public void sendMessage(String tag, String key, byte[] body) {
        Message msg = new Message("你要发送到哪个topic", tag, body);
        try {
            if (StringUtils.isNotBlank(key)) {
                msg.setKey(key);
            }
            SendResult sendResult = producer.send(msg);
            if (sendResult != null) {
                log.info("消息发送成功:{}", sendResult.toString());
            }
        } catch (ONSClientException e) {
            log.info("消息发送失败:{}", e);
        }
    }
}

5.创建消息消费者

@Component
@Slf4j
public class RocketMQConsumer implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        String response = new String(message.getBody());
        String tag = message.getTag();
        try {
            if (StringUtils.equals(tag, PushWayEnum.SMS.getTag())) {
                log.info("收到短信消息,消息内容:{}",response);
            } else if (StringUtils.equals(tag, PushWayEnum.WEIXIN.getTag())) {
                log.info("收到微信消息,消息内容:{}",response);
            }
            //确认消息已经消费
            return Action.CommitMessage;
        } catch (Exception e) {
            //稍后重新消费
            return Action.ReconsumeLater;
        }
    }
}

6.附录

  1. 阿里RocketMq官方文档 https://help.aliyun.com/document_detail/52591.html?spm=a2c4g.11186623.6.595.26ZBBH
  2. Apache RocketMq SpringBoot集成文档 https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spring-boot-starter/README_zh_CN.md

相关文章

网友评论

    本文标题:Springboot集成Ali RocketMq

    本文链接:https://www.haomeiwen.com/subject/sgahbftx.html