package com.okdeer.base.framework.mq;import java.util.List;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import com.alibaba.rocketmq.client.exception.MQBrokerException;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.client.producer.SendStatus;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import com.alibaba.rocketmq.remoting.exception.RemotingException;import com.okdeer.base.framework.mq.codec.MQCodecUtils;import com.okdeer.base.framework.mq.message.MQMessage;/** * 娑堟伅鐢熶骇鑰� * * @pr yschome-base * @author guocp * @date 2015骞�11鏈�19鏃� 涓嬪崍4:09:29 */public class RocketMQProducer implements IMQProducer, InitializingBean {/** 鏃ュ織瀵硅薄 */private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);/** 瀹炰緥鍚嶇О */private static final String INSTANCE_NAME = "rocketmq_instance";/** 娑堟伅鐢熶骇鑰� */private DefaultMQProducer producer;/** rocketMq閰嶇疆 */@Autowiredprivate RocketMQConfigurer rocketMQconfig;/** * @desc 鍒濆鍖� * * @throws Exception */@Overridepublic void afterPropertiesSet() throws Exception {// 鍒濆鍖栨秷鎭敓浜ц��initProducer();// 鍚姩鏈嶅姟this.start();// 鍚姩搴旂敤鏈嶅姟鍣ㄥ叧闂挬瀛�shutdownHook();}/** * 鍒濆鍖栨秷鎭敓浜ц�� * * @param config 閰嶇疆 */private void initProducer() {producer = new DefaultMQProducer(rocketMQconfig.getProducerGroupName());producer.setInstanceName(INSTANCE_NAME);// name Server鍦板潃producer.setNamesrvAddr(rocketMQconfig.getNamesrvAddr());producer.setDefaultTopicQueueNums(rocketMQconfig.getDefaultTopicQueueNums());producer.setRetryTimesWhenSendFailed(rocketMQconfig.getRetryTimesWhenSendFailed());producer.setSendMsgTimeout(rocketMQconfig.getSendMsgTimeout());producer.setMaxMessageSize(rocketMQconfig.getMaxMessageSize());}@Overridepublic SendResult send(final Message msg) throws Exception {try {SendResult sendResult = producer.send(msg);if (sendResult.getSendStatus() != SendStatus.SEND_OK) {logger.warn("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革細{},鐘舵�佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());}return sendResult;} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {logger.error("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);throw e;}}@Overridepublic SendResult send(final Message msg, final String orderCode) throws Exception {try {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List mqs, Message msg, Object arg) {
int index = Math.abs(arg.hashCode() % mqs.size());
return mqs.get(index);
}
}, orderCode);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
logger.warn("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革細{},鐘舵�佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());
}
return sendResult;
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
logger.error("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);
throw e;
}
}
/**
* @Description: 鍙戦�佹秷鎭�
* @param anMessage 鍖呰娑堟伅浣�
* @return
* @throws Exception
* @author guocp
* @date 2016骞�10鏈�28鏃�
*/
public SendResult sendMessage(final MQMessage anMessage) throws Exception {
final Message message = MQCodecUtils.wrap(anMessage);
if (message == null) {
throw new Exception("鍖呰娑堟伅鍑洪敊锛�");
}
if (anMessage.getKey() != null) {
message.setKeys(anMessage.getKey());
}
if (anMessage.getDelayTimeLevel() != null) {
message.setDelayTimeLevel(anMessage.getDelayTimeLevel());
}
SendResult sendResult = this.send(message);
return sendResult;
}
/**
* @Description: 鍙戦�侀『搴忔秷鎭�
* @param anMessage 鍖呰娑堟伅浣�
* @param orderCode
* @return
* @throws Exception
* @author guocp
* @date 2016骞�10鏈�28鏃�
*/
public SendResult sendMessage(final MQMessage anMessage, final String orderCode) throws Exception {
final Message message = MQCodecUtils.wrap(anMessage);
if (message == null) {
throw new Exception("鍖呰娑堟伅鍑洪敊锛�");
}
if (anMessage.getKey() != null) {
message.setKeys(anMessage.getKey());
}
if (anMessage.getDelayTimeLevel() != null) {
message.setDelayTimeLevel(anMessage.getDelayTimeLevel());
}
SendResult sendResult = this.send(message, orderCode);
return sendResult;
}
@Override
public void start() throws MQClientException {
producer.start();
}
/**
* 搴旂敤閫�鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠rocketMQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁 娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫�鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶
*/
public void shutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
shutdown();
}
}));
}
@Override
public void shutdown() {
producer.shutdown();
}
}
网友评论