美文网首页
Rocketmq 搭建 以及spring整合rocketmq发送

Rocketmq 搭建 以及spring整合rocketmq发送

作者: 苏小函 | 来源:发表于2019-01-24 23:14 被阅读0次

1: lunix系统安装rocketmq

Rocketmq是基于java的,需要下载源码后自行编译后才能使用,rocketmq的github地址:https://github.com/rocketmq/rocketmq ,

下载后,在更目录使用maven命令 mvn clean install -Dmaven.test.skip=true 编译,在distribution目录下target中会生成apache-rocketmq.tar.gz文件,上传此文件到服务器,解压,进入解压后文件夹的bin目录,执行2个命令分别启动mqnamesrv,mqbroker。

nohup ./mqnamesrv -c ../conf/broker.conf &
nohup ./mqbroker -n localhost:9876 -c ../conf/broker.conf &

如果启动失败,原因可能是

mqnamesrv mqbroker启动时提示内存不够,vim runserver.sh 修改启动内存大小 image1.png

同理,修改vim runbroker.sh

重新启动后,查看nohup.out的日志, image2.png
image3.png
出现进程说明启动成功了。

2: spring 整合rocketmq

我这边搭建的是微服务框架,写成了spring-rocketmq-producer.xml 和spring-rocketmq-consumer.xml

spring-rocketmq-producer.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

<bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown">
<property name="producerGroup" value="${rocketmq.produce.group}"/>
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
<property name="createTopicKey" value="AUTO_CREATE_TOPIC_KEY"/>
</bean>
</beans>

spring-rocketmq-consumer.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="rocketMqConsumerListener" class="com.yfancy.app.notify.service.impl.RocketMqConsumerListener" />
<bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
<property name="consumerGroup" value="${rocketmq.consumer.group}"/>
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
<property name="messageListener" ref="rocketMqConsumerListener"/>
<property name="subscription">
<map>
<entry key="test_topic">
<value>*</value>
</entry>
</map>
</property>
</bean>
</beans>

然后测试发送消息,

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {

private final static Logger log = LoggerFactory.getLogger(App.class);

public static void main( String[] args ){

try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "spring/spring-rocketmq-producer.xml" });
context.start();
log.info("NotifyService Dubbo Service == context start");
DefaultMQProducer producer = (DefaultMQProducer) context.getBean("rocketmqProduct");
Message msg = new Message("test_topic" ,
"Tag1" ,
("你好,社会人").getBytes(RemotingHelper.DEFAULT_CHARSET) );
producer.getDefaultMQProducerImpl().send(msg);
} catch (Exception e) {
log.error("[hy-blog-service-notify] == application start error:", e);
return;
}

synchronized (App.class) {
while (true) {
try {
App.class.wait();
} catch (InterruptedException e) {
log.error("== synchronized error:", e);
}
}
}
}
}

启动后,如果出现超时的异常,说明端口没开放,需要开放端口,我用的是阿里云的服务器,

image4.png
查看服务器中使用的端口,把9876,10909,10912,10911端口开放外网访问。如果你是本地虚拟机测试,直接关闭防火墙就行。如果开放这些端口还是提示超时异常,进入服务器rocketmq/conf目前修改broker.conf image5.png

指定你的brokerIP1 的ip为外网ip,重新启动即可。

3: 搭建rocketmq的管控台

下载https://github.com/apache/incubator-rocketmq-externals
进入rocketmq-console ,
修改其中application.properties的配置文件,设置rocketmq.config.namesrvAddr=你自己的外网ip:9876

image6.png
执行 mvn clean package -Dmaven.test.skip=true 编译,或生成一个jar包文件(我当时怎么编译都不成功,只好去拉了分支上的代码,才能编译成功)。
,然后编译后就能安心执行那个熟悉的java -jar命令了。

PS:自己在无法发送消息那踩了坑,只要是提示超时,基本是broker没连上namesrv,基本配置成外网ip,把对外的几个端口开放就行。

相关文章

网友评论

      本文标题:Rocketmq 搭建 以及spring整合rocketmq发送

      本文链接:https://www.haomeiwen.com/subject/zikpjqtx.html