美文网首页
RocketMQ 快速开始quickstart && borke

RocketMQ 快速开始quickstart && borke

作者: 严重思想跑偏患者 | 来源:发表于2019-04-08 15:55 被阅读0次

    producer

    public class Producer1 {
    
        public static void main(String[] agrs) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
    
            DefaultMQProducer producer = new
                    DefaultMQProducer("TopicTest3-produceGroup1",ACLClient.getAclRPCHook());
            // Specify name server addresses.
            producer.setNamesrvAddr("10.1.54.46:9876");
            producer.setInstanceName("producer 1");
            //Launch the instance.
            producer.start();
            
            for (int i = 0; i < 10; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest3" /* Topic */,
                        "abc" /* Tag */,"OrderID188",
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                msg.putUserProperty("coal", String.valueOf(i));
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
    
        }
    }
    

    consumer

    public class AclConsumer {
    
        public static void main(String[] agrs) throws MQClientException {
            //指定Group和ACL
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup5", ACLClient.getAclRPCHook(), new AllocateMessageQueueAveragely());
    
            consumer.setNamesrvAddr("localhost:9876");
            consumer.setInstanceName("consumer 5");
    
            //集群订阅(MessageModel.CLUSTERING)
            //广播订阅(MessageModel.BROADCASTING)
            consumer.setMessageModel(MessageModel.BROADCASTING);
    
            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            //可以修改每次消费消息的数量,默认设置是每次消费一条
            consumer.setConsumeMessageBatchMaxSize(10);
    
    
            //设置consumer所订阅的Topic和Tag,*代表全部的Tag MessageSelector.byTag
            consumer.subscribe("TopicTest3", MessageSelector.bySql("coal between 2 and 7"));
    
            //注册消费的监听
            consumer.registerMessageListener(MessageListener.getInstance());
    
            consumer.start();
            System.out.println("consumer 2 is started");
    
        }
    }
    

    ACLClient

    public class ACLClient {
    
        private static final String ACL_ACCESS_KEY = "RocketMQ2";
        private static final String ACL_SECRET_KEY = "12345678";
    
        static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
        }
    
        private static final String TEST_ACL_ACCESS_KEY = "testKey";
        private static final String TEST_ACL_SECRET_KEY = "12345678";
    
        static RPCHook getTestAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(TEST_ACL_ACCESS_KEY,TEST_ACL_SECRET_KEY));
        }
    
    
        private static final String ERROR_ACL_ACCESS_KEY = "RocketMQ333";
        private static final String ERROR_ACL_SECRET_KEY = "12345673333";
    
        static RPCHook getErrorAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(ERROR_ACL_ACCESS_KEY,ERROR_ACL_SECRET_KEY));
        }
    }
    

    Consumer 的 MessageListener

    public class MessageListener implements MessageListenerConcurrently {
    
        private static MessageListener instance = null;
    
        public static MessageListener getInstance() {
            if (instance == null) {
                synchronized (MessageListener.class) {
                    if (instance == null) {
                        instance = new MessageListener();
                    }
                }
            }
            return instance;
        }
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt messageExt : msgs) {
                String messageBody = new String(messageExt.getBody());
                System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
                        new Date()) + "消费响应: msgBody : " + messageBody);//输出消息内容
            }
            //ACK
            //CONSUME_SUCCESS 消费成功
            //RECONSUME_LATER 消费失败,需要稍后重新消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    
        private MessageListener() {
        }
    }
    

    Borker配置文件介绍

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    #  brokerClusterName = DefaultCluster
    #  brokerName = broker-a
    #  brokerId = 0
    #  deleteWhen = 04
    #  fileReservedTime = 48
    #  brokerRole = ASYNC_MASTER
    #  flushDiskType = ASYNC_FLUSH
    
    # 所属集群名字 
    brokerClusterName = default-rocketmq-cluster
    # true后可以使用SQL92
    enablePropertyFilter = true
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName = broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId = 0 
    #nameServer地址,分号分割
    brokerIP1 = 192.168.6.46
    namesrvAddr = 192.168.6.46:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums = 4 
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
    autoCreateTopicEnable = true 
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
    autoCreateSubscriptionGroup = true 
    #Broker 对外服务的监听端口 
    listenPort = 10911 
    #删除文件时间点,默认凌晨 4点 
    deleteWhen = 04 
    #文件保留时间,默认 48 小时 
    fileReservedTime = 120 
    #commitLog每个文件的大小默认1G 
    mapedFileSizeCommitLog = 1073741824 
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整 
    mapedFileSizeConsumeQueue = 300000 
    #destroyMapedFileIntervalForcibly=120000 
    #redeleteHangedFileInterval=120000 
    #检测物理文件磁盘空间 
    diskMaxUsedSpaceRatio = 88 
    #存储路径 
    storePathRootDir = D:\RocketMQ\target 
    #commitLog 存储路径 
    storePathCommitLog = D:\RocketMQ\target\commitLog 
    #消费队列存储路径存储路径 
    storePathConsumeQueue = D:\RocketMQ\target\consumequeue 
    #消息索引存储路径 
    storePathIndex = D:\RocketMQ\target\index 
    #checkpoint 文件存储路径 
    storeCheckpoint = D:\RocketMQ\target\checkpoint 
    #Broker 的角色 
    #- ASYNC_MASTER 异步复制Master 
    #- SYNC_MASTER 同步双写Master 
    #- SLAVE brokerRole=ASYNC_MASTER 
    #刷盘方式 
    #- ASYNC_FLUSH 异步刷盘 
    #- SYNC_FLUSH 同步刷盘 
    flushDiskType = ASYNC_FLUSH 
    #checkTransactionMessageEnable=false 
    #abort 文件存储路径 
    abortFile = D:\RocketMQ\target\abort 
    #限制的消息大小 
    maxMessageSize = 65536 
    #flushCommitLogLeastPages=4 
    #flushConsumeQueueLeastPages=2 
    #flushCommitLogThoroughInterval=10000 
    #flushConsumeQueueThoroughInterval=60000
    #acl控制
    aclEnable=true
    

    相关文章

      网友评论

          本文标题:RocketMQ 快速开始quickstart && borke

          本文链接:https://www.haomeiwen.com/subject/gecpiqtx.html