美文网首页
Rocketmq 搭建 以及spring整合rocketmq发送

Rocketmq 搭建 以及spring整合rocketmq发送

作者: 苏小函 | 来源:发表于2019-01-24 23:14 被阅读0次

    1: lunix系统安装rocketmq

    Rocketmq是基于java的,需要下载源码后自行编译后才能使用,rocketmq的github地址:https://github.com/rocketmq/rocketmq ,

    下载后,在更目录使用maven命令 mvn clean install -Dmaven.test.skip=true 编译,在distribution目录下target中会生成apache-rocketmq.tar.gz文件,上传此文件到服务器,解压,进入解压后文件夹的bin目录,执行2个命令分别启动mqnamesrv,mqbroker。

    nohup ./mqnamesrv -c ../conf/broker.conf &
    nohup ./mqbroker -n localhost:9876 -c ../conf/broker.conf &
    

    如果启动失败,原因可能是

    mqnamesrv mqbroker启动时提示内存不够,vim runserver.sh 修改启动内存大小 image1.png

    同理,修改vim runbroker.sh

    重新启动后,查看nohup.out的日志, image2.png
    image3.png
    出现进程说明启动成功了。

    2: spring 整合rocketmq

    我这边搭建的是微服务框架,写成了spring-rocketmq-producer.xml 和spring-rocketmq-consumer.xml

    spring-rocketmq-producer.xml如下

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
    
    <bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown">
    <property name="producerGroup" value="${rocketmq.produce.group}"/>
    <property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
    <property name="createTopicKey" value="AUTO_CREATE_TOPIC_KEY"/>
    </bean>
    </beans>
    

    spring-rocketmq-consumer.xml如下

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="rocketMqConsumerListener" class="com.yfancy.app.notify.service.impl.RocketMqConsumerListener" />
    <bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
    <property name="consumerGroup" value="${rocketmq.consumer.group}"/>
    <property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
    <property name="messageListener" ref="rocketMqConsumerListener"/>
    <property name="subscription">
    <map>
    <entry key="test_topic">
    <value>*</value>
    </entry>
    </map>
    </property>
    </bean>
    </beans>
    

    然后测试发送消息,

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    public class App {
    
    private final static Logger log = LoggerFactory.getLogger(App.class);
    
    public static void main( String[] args ){
    
    try {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "spring/spring-rocketmq-producer.xml" });
    context.start();
    log.info("NotifyService Dubbo Service == context start");
    DefaultMQProducer producer = (DefaultMQProducer) context.getBean("rocketmqProduct");
    Message msg = new Message("test_topic" ,
    "Tag1" ,
    ("你好,社会人").getBytes(RemotingHelper.DEFAULT_CHARSET) );
    producer.getDefaultMQProducerImpl().send(msg);
    } catch (Exception e) {
    log.error("[hy-blog-service-notify] == application start error:", e);
    return;
    }
    
    synchronized (App.class) {
    while (true) {
    try {
    App.class.wait();
    } catch (InterruptedException e) {
    log.error("== synchronized error:", e);
    }
    }
    }
    }
    }
    

    启动后,如果出现超时的异常,说明端口没开放,需要开放端口,我用的是阿里云的服务器,

    image4.png
    查看服务器中使用的端口,把9876,10909,10912,10911端口开放外网访问。如果你是本地虚拟机测试,直接关闭防火墙就行。如果开放这些端口还是提示超时异常,进入服务器rocketmq/conf目前修改broker.conf image5.png

    指定你的brokerIP1 的ip为外网ip,重新启动即可。

    3: 搭建rocketmq的管控台

    下载https://github.com/apache/incubator-rocketmq-externals
    进入rocketmq-console ,
    修改其中application.properties的配置文件,设置rocketmq.config.namesrvAddr=你自己的外网ip:9876

    image6.png
    执行 mvn clean package -Dmaven.test.skip=true 编译,或生成一个jar包文件(我当时怎么编译都不成功,只好去拉了分支上的代码,才能编译成功)。
    ,然后编译后就能安心执行那个熟悉的java -jar命令了。

    PS:自己在无法发送消息那踩了坑,只要是提示超时,基本是broker没连上namesrv,基本配置成外网ip,把对外的几个端口开放就行。

    相关文章

      网友评论

          本文标题:Rocketmq 搭建 以及spring整合rocketmq发送

          本文链接:https://www.haomeiwen.com/subject/zikpjqtx.html