2018-04-21 DefaultMQProducer

作者: 楼亭樵客 | 来源:发表于2018-05-16 09:26 被阅读0次

    package com.okdeer.base.framework.mq;import java.util.List;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import com.alibaba.rocketmq.client.exception.MQBrokerException;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.client.producer.SendStatus;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import com.alibaba.rocketmq.remoting.exception.RemotingException;import com.okdeer.base.framework.mq.codec.MQCodecUtils;import com.okdeer.base.framework.mq.message.MQMessage;/** * 娑堟伅鐢熶骇鑰� * * @pr yschome-base * @author guocp * @date 2015骞�11鏈�19鏃� 涓嬪崍4:09:29 */public class RocketMQProducer implements IMQProducer, InitializingBean {/** 鏃ュ織瀵硅薄 */private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);/** 瀹炰緥鍚嶇О */private static final String INSTANCE_NAME = "rocketmq_instance";/** 娑堟伅鐢熶骇鑰� */private DefaultMQProducer producer;/** rocketMq閰嶇疆 */@Autowiredprivate RocketMQConfigurer rocketMQconfig;/** * @desc 鍒濆鍖� * * @throws Exception */@Overridepublic void afterPropertiesSet() throws Exception {// 鍒濆鍖栨秷鎭敓浜ц��initProducer();// 鍚姩鏈嶅姟this.start();// 鍚姩搴旂敤鏈嶅姟鍣ㄥ叧闂挬瀛�shutdownHook();}/** * 鍒濆鍖栨秷鎭敓浜ц�� * * @param config 閰嶇疆 */private void initProducer() {producer = new DefaultMQProducer(rocketMQconfig.getProducerGroupName());producer.setInstanceName(INSTANCE_NAME);// name Server鍦板潃producer.setNamesrvAddr(rocketMQconfig.getNamesrvAddr());producer.setDefaultTopicQueueNums(rocketMQconfig.getDefaultTopicQueueNums());producer.setRetryTimesWhenSendFailed(rocketMQconfig.getRetryTimesWhenSendFailed());producer.setSendMsgTimeout(rocketMQconfig.getSendMsgTimeout());producer.setMaxMessageSize(rocketMQconfig.getMaxMessageSize());}@Overridepublic SendResult send(final Message msg) throws Exception {try {SendResult sendResult = producer.send(msg);if (sendResult.getSendStatus() != SendStatus.SEND_OK) {logger.warn("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革細{},鐘舵�佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());}return sendResult;} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {logger.error("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);throw e;}}@Overridepublic SendResult send(final Message msg, final String orderCode) throws Exception {try {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List mqs, Message msg, Object arg) {

    int index = Math.abs(arg.hashCode() % mqs.size());

    return mqs.get(index);

    }

    }, orderCode);

    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {

    logger.warn("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革細{},鐘舵�佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());

    }

    return sendResult;

    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {

    logger.error("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);

    throw e;

    }

    }

    /**

    * @Description: 鍙戦�佹秷鎭�

    * @param anMessage 鍖呰娑堟伅浣�

    * @return

    * @throws Exception 

    * @author guocp

    * @date 2016骞�10鏈�28鏃�

    */

    public SendResult sendMessage(final MQMessage anMessage) throws Exception {

    final Message message = MQCodecUtils.wrap(anMessage);

    if (message == null) {

    throw new Exception("鍖呰娑堟伅鍑洪敊锛�");

    }

    if (anMessage.getKey() != null) {

    message.setKeys(anMessage.getKey());

    }

    if (anMessage.getDelayTimeLevel() != null) {

    message.setDelayTimeLevel(anMessage.getDelayTimeLevel());

    }

    SendResult sendResult = this.send(message);

    return sendResult;

    }

    /**

    * @Description: 鍙戦�侀『搴忔秷鎭�

    * @param anMessage 鍖呰娑堟伅浣�

    * @param orderCode

    * @return

    * @throws Exception 

    * @author guocp

    * @date 2016骞�10鏈�28鏃�

    */

    public SendResult sendMessage(final MQMessage anMessage, final String orderCode) throws Exception {

    final Message message = MQCodecUtils.wrap(anMessage);

    if (message == null) {

    throw new Exception("鍖呰娑堟伅鍑洪敊锛�");

    }

    if (anMessage.getKey() != null) {

    message.setKeys(anMessage.getKey());

    }

    if (anMessage.getDelayTimeLevel() != null) {

    message.setDelayTimeLevel(anMessage.getDelayTimeLevel());

    }

    SendResult sendResult = this.send(message, orderCode);

    return sendResult;

    }

    @Override

    public void start() throws MQClientException {

    producer.start();

    }

    /**

    * 搴旂敤閫�鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠rocketMQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁 娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫�鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶

    */

    public void shutdownHook() {

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

    public void run() {

    shutdown();

    }

    }));

    }

    @Override

    public void shutdown() {

    producer.shutdown();

    }

    }

    相关文章

      网友评论

        本文标题:2018-04-21 DefaultMQProducer

        本文链接:https://www.haomeiwen.com/subject/cbatlftx.html