美文网首页
rocketmq部署、入门

rocketmq部署、入门

作者: kafeimao | 来源:发表于2020-12-20 21:17 被阅读0次

    参考:https://www.cnblogs.com/kiwifly/p/11546008.html
    前提:安装了docker

    启动nameserver

    创建nameserver的日志和数据存放路径

    /home/rocketmq/namesrv/logs
    /home/rocketmq/namesrv/store
    

    启动命令

    docker run -d --name rmqnamesrv -p 9876:9876 
    -v /home/rocketmq/namesrv/logs:/opt/logs 
    -v /home/rocketmq/namesrv/store:/opt/store
     rocketmqinc/rocketmq 
    sh mqnamesrv
    

    启动broker

    创建broker的日志和数据存放的路径以及配置

    /home/rocketmq/broker/logs
    /home/rocketmq/broker/store
    /home/rocketmq/broker/conf/broker.conf
    

    在broker.conf中写入配置

    terName=DefaultCluster
    brokerName=broker-a
    brokerId=0
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    brokerIP1= 我的服务器外网IP
    namesrvAddr=我的服务器外网IP:9876
    

    启动命令

    docker run -d --name rmqbroker  -p 10911:10911 -p 10909:10909
     -v  /home/rocketmq/broker/logs:/root/logs 
    -v  /home/rocketmq/broker/store:/root/store 
    -v /home/rocketmq/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf 
    --link rmqnamesrv:namesrv 
    -e "NAMESRV_ADDR=namesrv:9876" 
    rocketmqinc/rocketmq 
    sh mqbroker -c /opt/rocketmq/conf/broker.conf
    

    启动rocketmq的控制台

    docker run -d -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=我的服务器IP:9876 -Drocketmq.config.isVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng
    

    打开浏览器看看


    image.png

    Java入门测试

    maven依赖

    <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.1.0-incubating</version>
            </dependency>
    

    生产者

    public class Producer {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("Producer");
            producer.setNamesrvAddr("我的服务器ip:9876");
            try {
                producer.start();
                Message msg = new Message("test-topic","tag","1"."Just for test.".getBytes());
                SendResult result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                producer.shutdown();
            }
        }
    }
    

    消费者

    public class Consumer {
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
            consumer.setNamesrvAddr("我的服务器ip:9876");
            try {
                consumer.subscribe("test-topic", "tag");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                                                     @Override
                                                     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) {
                                                         Message msg = list.get(0);
                                                         String topic = msg.getTopic();
                                                         System.out.println("topic = " + topic);
                                                         byte[] body = msg.getBody();
                                                         System.out.println("body:  " + new String(body));
                                                         String keys = msg.getKeys();
                                                         System.out.println("keys = " + keys);
                                                         String tags = msg.getTags();
                                                         System.out.println("tags = " + tags);
                                                         System.out.println("-----------------------------------------------");
                                                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                                     }
                                                 }
                );
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    整合springboot

    相关文章

      网友评论

          本文标题:rocketmq部署、入门

          本文链接:https://www.haomeiwen.com/subject/vslcnktx.html