美文网首页
用rabbitMq解决web高并发的学习笔记

用rabbitMq解决web高并发的学习笔记

作者: DONG999 | 来源:发表于2018-09-30 09:42 被阅读0次
    1. 引入pom.xml
    <!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/
            https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp
             -->
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-amqp</artifactId>
                </dependency>
    
    1. application.yml 配置部分
        #rabbitmq
        rabbitmq:
          host: 10.0.0.2
          port: 5672
          username: springboot
          password: password
          publisher-confirms: true
          publisher-returns: true
          template:
             mandatory: true
          #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
          listener:
             concurrency: 2
             #最小消息监听线程数
             max-concurrency: 2
             #最大消息监听线程数
    mybatis:
      mapper-locations: classpath:mapping/*.xml
    ···
    
    1. 创建配置文件
    package com.redis;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    //错误的 import com.rabbitmq.client.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    
    /**
     * 
     * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
     * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。
     * 
     * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
     * 
     * Routing Key:路由关键字,
     * 
     * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
     * 
     * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序.
     * Channel:消息通道,在客户端的每个连接里,可建立多个channel.
     *
     * 
     */
    
    @Configuration
    public class RabbitConfig {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Value("${spring.rabbitmq.host}")
        private String host;
    
        @Value("${spring.rabbitmq.port}")
        private int port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        public static final String EXCHANGE_A = "my-mq-exchange_A";
        
        public static final String QUEUE_A = "QUEUE_A";
        
        public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setPublisherConfirms(true);
            
            
            return connectionFactory;
        }
    
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        // 必须是prototype类型
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            return template;
        }
    
        /**
         * * 针对消费者配置 * 
         * 1. 设置交换机类型 * 
         * 2. 将队列绑定到交换机 
         * 
         * FanoutExchange:
         * 将消息分发到所有的绑定队列,无routingkey的概念 
         * 
         * HeadersExchange :通过添加属性key-value匹配
         * DirectExchange:按照routingkey分发到指定队列 
         * TopicExchange:多关键字匹配
         * 
         * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。
    常用交换器类型如下:
    
            Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 
            即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
            
            Topic(TopicExchange):按规则转发消息(最灵活)。
            
            Headers(HeadersExchange):设置header attribute参数类型的交换机。
            
            Fanout(FanoutExchange):转发消息到所有绑定队列。
     
         * 
         */
        
        
    
    
        
    
        
        
        @Bean
        public DirectExchange defaultExchange() {
            return new DirectExchange(EXCHANGE_A);
        }
    
        /**
         * 获取队列A
         * 
         * @return
         */
        @Bean
        public Queue queueA() {
            return new Queue(QUEUE_A, true); // 队列持久
        }
    
    
    
        /**
         * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
         * 
         * @return
         */
        @Bean
        public Binding binding() {
    
            return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
        }
    
        
    
    }
    
    1. Producer
    package com.redis;
    
    import java.util.UUID;
    import org.springframework.amqp.core.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    
    @Component
    public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{
        
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        
        // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
        private RabbitTemplate rabbitTemplate;
    
        /** * 构造方法注入rabbitTemplate */
        @Autowired
        public MsgProducer(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            rabbitTemplate.setConfirmCallback(this);
            // rabbitTemplate如果为单例的话,那回调就是最后设置的内容
            
            /**
             * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
             * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
             */
     
            
            rabbitTemplate.setReturnCallback(this);
        }
        
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");
            
            System.out.println("消息主体 message : "+message);
            System.out.println("消息主体 message : "+replyCode);
            System.out.println("描述:"+replyText);
            System.out.println("消息使用的交换器 exchange : "+exchange);
            System.out.println("消息使用的路由键 routing : "+routingKey);
     
            
        }
        
         /**
          * rabbitTemplate.send(message);   //发消息,参数类型为org.springframework.amqp.core.Message
            rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
            rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。
         
          * @param content
          */
    
        public void sendMsg(String content) {
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            
            System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId);
            String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString();
            System.out.println("结束发送消息c : " + content.toLowerCase());
            System.out.println("消费者响应c : " + response + " 消息处理完成");
     
            //logger.info(" 发送消息TO A:" + content);
            // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
            //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
        }
    
        /** * 回调 */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            logger.info(" 回调id:" + correlationData);
            if (ack) {
                logger.info("消息成功消费");
            } else {
                logger.info("消息消费失败:" + cause);
            }
            
            
        
        }
    
    }
    
    
    1. Receiver
    package com.redis;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = RabbitConfig.QUEUE_A)
    public class MsgReceiver {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @RabbitHandler
        public void process(String content) {
            System.out.println("接收处理队列A当中的消息: " + content);
        }
    }
    
    
    
    1. unit test
     
    
    import java.util.Date;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.redis.MsgProducer;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqTest {
    
        @Autowired
        private MsgProducer sender;
        
        @Test
        public void sendTest() throws Exception {
            //while(true){
                String msg = new Date().toString();
                sender.sendMsg(msg);
                Thread.sleep(6000);
            //}
        } 
    
    }
    
    
    1. 可以测试通过
    image.png
    1. 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
    package com.test.ratelimit;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import com.google.common.util.concurrent.RateLimiter;
     
     
    
    
    public class ComplexDemo {
        
        private static RateLimiter rateLimiter = RateLimiter.create(10);
        
        private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0);
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            
     
            List<Runnable> tasks = new ArrayList<Runnable>();  
            for (int i = 0; i < 100; i++) {  
                tasks.add(new UserRequest(i));  
            }  
            ExecutorService threadPool = Executors.newCachedThreadPool();  
            for (Runnable runnable : tasks) {
                threadPool.execute(runnable);  
            } 
            
            
        }
        
        
        private static boolean startGo(int i) {
             //基于令牌桶算法的限流实现类
            
            /**
             * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。
             *       
             */
            
            /** 
             * tryAcquire(long timeout, TimeUnit unit) 
             * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, 
             * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) 
             */  
            
            //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序  
            if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) {  
                System.out.println("暂时无法获取令牌, 排队失败" + i);  
                fail.getAndIncrement();
                System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get());  
                return false;
            }  
            if (update() > 0) {  
                System.out.println("成功" + i);  
                suc.getAndIncrement();
                System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get());  
                return true;
            }  
            
            System.out.println("数据不足,失败");  
            return false;
        }
        
        
        private static int update() {
            return 1;
        }
        
         private static class UserRequest implements Runnable {  
                private int id;  
          
                public UserRequest(int id) {  
                    this.id = id;  
                }  
          
                public void run() {  
                    startGo(id) ;
                }  
            }  
    
    }
    
    测试结果: 
    ...
    成功8
    FAIL/SUC=89/10
    成功7
    FAIL/SUC=89/11
    
    

    总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。

    REF: http://www.rabbitmq.com/install-rpm.html

    相关文章

      网友评论

          本文标题:用rabbitMq解决web高并发的学习笔记

          本文链接:https://www.haomeiwen.com/subject/fxzvoftx.html