美文网首页
RocketMQ安装与应用

RocketMQ安装与应用

作者: 长孙俊明 | 来源:发表于2020-01-12 16:47 被阅读0次
    1. 前提
      安装jdk1.8+

    2. 安装

    cd /home
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source-release.zip
    unzip rocketmq-all-4.6.0-source-release.zip
    mv rocketmq-all-4.6.0-source-release rocketmq-all-4.6.0
    cd rocketmq-all-4.6.0/
    mvn -Prelease-all -DskipTests clean install -U
    cd /home/rocketmq-all-4.6.0/distribution/target/rocketmq-4.6.0/rocketmq-4.6.0
    
    1. 启动
    nohup sh bin/mqnamesrv -c conf/broker.conf &
    nohup sh bin/mqbroker -c conf/broker.conf autoCreateTopicEnable=true &
    
    1. 停止
    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv
    
    1. 配置
    # cat conf/broker.conf
    # 配置如下
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    brokerIP1 = 39.108.249.168
    namesrvAddr=39.108.249.168:9876
    autoCreateTopicEnable=true
    aclEnable=true
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    
    # cat conf/plain_acl.yml
    # 配置如下
    globalWhiteRemoteAddresses:
    - 10.10.103.*
    - 192.168.0.*
    - 113.110.233.*
    
    accounts:
    - accessKey: RocketMQ
      secretKey: 12345678
      whiteRemoteAddress:
      admin: false
      defaultTopicPerm: DENY
      defaultGroupPerm: SUB
      topicPerms:
      - topicA=DENY
      - topicB=PUB|SUB
      - topicC=SUB
      groupPerms:
      # the group should convert to retry topic
      - groupA=DENY
      - groupB=PUB|SUB
      - groupC=SUB
    
    - accessKey: rocketmq2
      secretKey: 12345678
      whiteRemoteAddress: 192.168.1.*
      # if it is admin, it could access all resources
      admin: true
    
    1. 代码
    # pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>demo</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.6.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-acl</artifactId>
                <version>4.6.0</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    
    # 生产者代码
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.UUID;
    
    public class AclProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
            DefaultMQProducer producer = new DefaultMQProducer("groupA2", getAclRPCHook());
            producer.setNamesrvAddr("39.108.249.168:9876");
            producer.setSendMsgTimeout(1000 * 10);
            producer.setVipChannelEnabled(false);
            producer.start();
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("我发送给你" + UUID.randomUUID().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.println("消息发送响应信息:"+sendResult.toString());
            producer.shutdown();
        }
    
        static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
        }
    }
    
    # 消费者代码
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class AclConsumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "*");
            consumer.setNamesrvAddr("39.108.249.168:9876");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    
        static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
        }
    }
    

    相关文章

      网友评论

          本文标题:RocketMQ安装与应用

          本文链接:https://www.haomeiwen.com/subject/nbtwactx.html