美文网首页
activeMQ了解一下(二)

activeMQ了解一下(二)

作者: 夏日橘子冰 | 来源:发表于2018-07-19 16:05 被阅读0次

常用消息中间件有activeMQ,RabbitMQ,Kafka,ZeroMQ,MetaMQ,RocketMQ,本文这里只介绍第一个。

一、linux上安装
下载tar.gz包   解压tar -zxvf 
在MQ目录下bin/linux-x86-64下  ./activemq start   ./activemq stop
http://IP:8161/admin
默认用户名密码 admin/admin 
三、发布和消费消息(这里以P2P为例)
1,写一个发消息服务如下:
@Service("notifyService")
public class NotifyServiceImpl implements INotifyService{
    private static final Logger logger = LoggerFactory.getLogger(NotifyServiceImpl.class);
    
    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;
    
    @Resource(name = "awardMsgDestinationQueue")
    private Destination awardMsgDestinationQueue;
 
    @Override
    public void sendAwardMsg(final String msg) {
        try {
            Destination destination = this.awardMsgDestinationQueue;
            jmsTemplate.send(destination,new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage();
                    textMessage.setText(msg);
                    return textMessage;
                }
            });

        } catch (Exception ex) {
            ex.printStackTrace();
            logger.error("向默认队列发送消息失败");
        }
    }
}
2,写一个消费监听器
public class AwardMsgQueueListener implements MessageListener{  
    @Override
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            msg = tm.getText();
        } catch (JMSException e1) {
            logger.error("从消息队列获取消息出现异常,请检查");
            e1.printStackTrace();
            return;
        }
        ......//根据实际业务拿到msg并处理
    }
}

由上可以看出,我们发布消息注入了jms模板和queue,这就需要我们先进行配置

二、整合spring

基础配置文件如下:

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用缓存可以提升效率-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>
    
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
    
    <!-- 定义推送中奖消息队列(Queue) -->
    <bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="awardMsgDestinationQueue"/>
    </bean>
    
    <!-- 配置监听者(Queue) -->
    <bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />

    <!-- 配置多个消息监听容器,配置连接工厂,监听的目标是defaultDestinationQueue,监听器是上面定义的监听器 -->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="awardMsgDestinationQueue" />
        <property name="messageListener" ref="awardMsgQueueListener" />

    </bean>

相关文章

网友评论

      本文标题:activeMQ了解一下(二)

      本文链接:https://www.haomeiwen.com/subject/raytmftx.html