美文网首页
(转载)spring-boot+mysql+Redis+rabb

(转载)spring-boot+mysql+Redis+rabb

作者: 7b7d23d16ab5 | 来源:发表于2020-01-31 21:32 被阅读0次

    (转载)spring-boot+mysql+Redis+rabbitMQ队列+多线程模拟并发-实现请求并发下的商城秒杀系统

    原文链接:https://blog.csdn.net/zhangli_wei0403/article/details/84940636

    原创 、那些 最后发布于2018-12-10 18:28:34 阅读数 4122 收藏
    展开
    写在前面:

    这几天无聊看了一下rabbitMQ队列,想着结合Redis的递减功能做一个商城秒杀系统(支持并发)。自己弄了一天半的时间总算差不多懂了(ps:只是运行过程和逻辑,至于rabbitMQ的原理还没怎么深入研究,以后有时间在看吧),现在把我的逻辑和代码分享出来,供各位瞅瞅。如果哪里有问题或不对的地方还请各位大佬在下方不吝赐教。废话不说进入主题,还是从下面的一张图开始吧(画的不咋地,各位将就看下吧)~~~

    因我最近工作有点忙,可能会顾不到给大家发邮箱了,所以给大家个地址,自己去下载吧!源码下载
    

    首先,在使用rabbitMQ之前需要先在本地安装,因为rabbit是基于Erlang的,所以需要先下载安装Erlang,具体的步骤请点击这个链接:https://www.cnblogs.com/ericli-ericli/p/5902270.html

    笔者就是按照这个步骤安装的,所以一步步来肯定没问题,需要注意的是安装成功后记得给账号分配权限,我就是因为没注意权限才导致好长时间项目不能启动,看我下面的截图注意红框中的信息,这是分配后的,如果为分配读写权限会是另一种黄色的背景颜色。

    好了,进入项目阶段,因为我的项目是基于spring-boot做的,所以只需要在pom文件加个jar就行,,,在下图

    然后是项目的application配置文件加入rabbitMQ的连接信息,在下图,很好理解我就不多说了,填入自己rabbit安装成功后设置的账号信息就行。

    因为rabbitMQ队列的模式有好几个,当前使用的为Direct模式,此模式类似于一对一的关系(放入队列的时候指定队列名称,消费当前队列消息时,使用@RabbitListener注解指定获取某个名称的队列)。

    具体的实现主要为三个步骤:

    1.定义一个任意名称的队列,注意类及方法上的注解

    package com.rabbitmq.config;
     
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
     
    @Configuration
    public class RabbitMQConfig {
     
        static final String QUEUE = "product_secondsKill";
        
        /**
         * Direct模式
         * @return
         */
        @Bean
        public Queue directQueue() {
            // 第一个参数是队列名字, 第二个参数是指是否持久化
            return new Queue(QUEUE, true);
        }
    }
    

    2.消息入队方法,我是在service层调用的,具体怎么使用你就爱谁谁吧

    package com.rabbitmq.config;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
     
    @Component
    public class Sender {
     
        private final static Logger log = LoggerFactory.getLogger(Sender.class);
        
        @Autowired
        AmqpTemplate amqpTemplate;
     
            int i = 0;
        
        /**
         * 消息入队-在需要的时候调用即可
         * @param msg此为商品的id,根据此id在消费消息时验证Redis中商品的库存是否充足
         */
        public void sendDirectQueue(String msg) {
            log.info(">>>>>>>>>>>>>>秒杀请求已发送,商品ID为:"+msg);
            try {
                //第一个参数是指要发送到哪个队列里面, 第二个参数是指要发送的内容
                amqpTemplate.convertAndSend(RabbitMQConfig.QUEUE, msg);
                //此处为了记录并发请求下,请求的次数及消息传递的次数
                log.info("发送请求>>>>>>>>>>>>>"+i++);
            } catch (AmqpException e) {
                log.error("请求发送异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }
    

    3.消息出列(消费消息)的方法,相关逻辑再次执行

    package com.rabbitmq.config;
     
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
     
    import com.business.dao.product.ProductMapper;
    import com.enums.ErrorCodeEnum;
    import com.excetion.ServerBizException;
    import com.rabbitmq.client.Channel;
    import com.util.redis.RedisStringUtils;
     
    @Component
    public class Receiver {
     
        private final static Logger log = LoggerFactory.getLogger(Receiver.class);
        
        @Autowired
        RedisStringUtils redisStringUtils;
        
        @Autowired
        ProductMapper productMapper;
     
            int i = 0;
        
        /**
         * 消息出列(消费消息)方法-和消息入列没有直接的调用关系
         * 是通过注解中指定的名称进行的关联
         * @param msg-传递进来的数据
         * @param channel-注意,注意,注意这两个参数
         * @param message-注意,注意,注意这两个参数
         * @throws IOException
         */
        @RabbitListener(queues = RabbitMQConfig.QUEUE)
        public void receiverDirectQueue(String msg,Channel channel, Message message) throws IOException{
            log.info(">>>>>>>>>>>>>>>>>接收到秒杀请求,商品ID为:"+msg+"检查Redis中库存是否为0");
            try {
                long num = redisStringUtils.decr(msg);
                if(num < 0) {
                    /**
                     * 此处不能判断等于0,因为当商品库存为1时,Redis执行递减返回为0
                     * 如果判断为0商品最后不能卖完也就是当库存为1时此处就抛异常了
                     */
                    throw new ServerBizException(ErrorCodeEnum.PRODUCT_INVENTORY_IS_NULL);
                }
                log.info("接收时>>>>>>>>>>>"+i++);
                Map map = new HashMap<>();
                map.put("id", msg);
                map.put("Quantity", num);
                //根据商品的id和库存同步数据到MySQL
                if(productMapper.updateQuantityByPid(map) == 0) {
                    throw new ServerBizException("同步到商品表异常!");
                }
            } catch (Exception e) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                e.printStackTrace();
            }
        }
    }
    

    值得注意的是上面代码方法上需要注意的两个参数,笔者就是因为一开始不懂没有做这些逻辑导致此处出现错误,错误为:当此方法出现异常(比如:并发进来两个秒杀请求而库存中只有一件商品时,判断Redis中库存会进入死循环,一直抛出异常信息直到停止项目),解决为捕获异常并且在catch下面加入此代码,此处的作用请看下图

    此段代码的作用为:当消费消息出现异常时,我们需要取消确认,这时我们可以使用 Channel 的 basicReject 方法。其中第一个参数为指定 delivery tag,第二个参数说明如何处理这个失败消息:true为将该消息重新放入队列头,false为忽略该消息。

    好了,以上就是全部的实现流程,接下来介绍一下我项目中的全部实现:

    1.我使用了spring的监听器ApplicationListener,当项目初始化完成的时候自动扫描数据库中的商品信息,商品ID为key,库存为value,如果还需要其他限制逻辑(比如:秒杀的开始,结束时间和支持秒杀的某个时间段)自己去实现就行,我这里没做其他逻辑。下面为具体代码

    package com.rabbitmq;
     
    import java.util.Iterator;
    import java.util.List;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.annotation.Scope;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Service;
     
    import com.business.dao.product.ProductMapper;
    import com.business.model.product.Product;
    import com.util.redis.RedisStringUtils;
    @Service
    @Scope("singleton")
    public class ApplicationInitListener implements ApplicationListener<ContextRefreshedEvent>{
     
        private static final Logger logger = LoggerFactory.getLogger(ApplicationInitListener.class);
        
        @Autowired
        ProductMapper productMapper;
        
        @Autowired
        RedisStringUtils redisStringUtils;
        
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if(event.getApplicationContext().getParent() == null) {
                logger.info(">>>>>>>>>>>>项目初始化完成,执行监听器中逻辑");
                //mapper中的sql,返回全部上架(支持秒杀)的商品集合
                List<Product> list = productMapper.selectTimerTask();
                Iterator<Product> it = list.iterator();
                while(it.hasNext()) {
                    Product p = it.next();
                    logger.info("商品ID:"+p.getId()+"商品库存:"+p.getpQuantity());
                    try {
                        redisStringUtils.set(String.valueOf(p.getId()), String.valueOf(p.getpQuantity()));
                    } catch (Exception e) {
                        logger.error("当前商品ID:"+p.getId()+"库存:"+p.getpQuantity()+"放入Redis缓存异常<<<<<<<<<<<<<<<<<<<<");
                        e.printStackTrace();
                    }
                }
            }
        }
     
    }
    

    上边代码很简洁也有相关的注释,所以不再废话,下面就贴一下从请求进入到返回的全部逻辑,先来controller的代码,方法中只是调用了service中的方法,不多说

            /**
         * 秒杀入口
         * @param response
         * @param Pid-商品id,做检查库存使用
         * @param Uid-用户id,做订单和用户关联使用(比如生成成功秒杀商品的用户订单表)
         *               我这里没做多余的逻辑,只看了相关情况的返回结果,有需要的可以自己去实现
         */
        @RequestMapping(value = "secondsKill", method = RequestMethod.POST)
        public void secondsKill(HttpServletResponse response, String Pid, Integer Uid) {
            try {
                //模拟发送100次请求,库存设置为少于100查看结果,此100次请求为顺序请求(未并发)
                //for(int i=0; i<100; i++) {
                    boolean result = productService.secondsKill(Pid, String.valueOf(Uid));
                    if(result) {
                        ResponseUtil.renderSuccessJson(response, "success", result);
                    }
                //}
            } catch (ServerBizException e) {
                ResponseUtil.renderFailJson(response, e.getErrCode());
                e.printStackTrace();
            }
        }
    

    接下来是service的方法,其实逻辑也很简单,看注释吧。

            /**
         * 商品秒杀
         * @param Pid
         * @param Uid
         * @return
         * @throws ServerBizException
         */
            int i = 0;
        public boolean secondsKill(String Pid, String Uid) throws ServerBizException {
            boolean result = true;
                //根据商品id获取Redis中的库存数量
                String num = redisStringUtils.get(Pid).toString();
                System.out.println("redis>>>>>>>>>>"+num);
                if(new Long(num) <= 0) {
                    result = false;
                    throw new ServerBizException(ErrorCodeEnum.PRODUCT_INVENTORY_IS_NULL);
                }
                //消息入队,调用相关方法
                sender.sendDirectQueue(Pid);
                //只为验证请求及发送消息次数
                System.out.println("service>>>>>>>>>>"+i++);
                return result;
        }
    

    ,好了暂时就这么多吧,还有一个模拟多线程请求的方法,在下边

    public static void main(String[] args) {
            // 运用java工具类中线程池
            ExecutorService pool = Executors.newCachedThreadPool();
            for (int i = 0; i < 5; i++) { // 开启五个线程
                String url = "http://localhost:8080/product/secondsKill";
                Map<String, Object> paramStr = getHttpParamStr();
                pool.execute(new ServiceThreadTest(url, paramStr));
            }
        }
     
        public static Map<String, Object> getHttpParamStr() {
            Map<String, Object> param = new HashMap<String, Object>();
            param.put("Pid", "1");
            param.put("Uid", "10");
            return param;
        }
    

    下面咱们测试一下,首先在商品表中设置库存,我先设置10个吧。

    然后我们启动项目,看有没有执行监听器中的逻辑,自动把商品库存放入Redis

    项目启动完成,我们往上翻看下控制台信息

    可以看到,相关的信息已经输出的控制台了,下面我们运行下多线程测试类中的main方法,在看下,由于我这个项目做了非ajax请求的验证,所以我们需要在全局拦截器中加入秒杀请求的地址到白名单,下面图为设置白名单和运行main方法测试的结果

    ,接下来我们运行一下测试的main方法,看下图

    那么我们看下数据库中的库存是否 正常呢,

    因为我开启了5个线程模拟并发,所以也就是并行发起了5次秒杀请求,下面是数据库中的信息

    OK,库存正常。并且接口正常返回了信息

    现在库存还有5个,那么我们使用7个线程同时秒杀会怎样呢,前五个请求应该是成功的,但是后两个会不会也成功呢,如果也成功就证明我们这个验证逻辑有问题了,我们来试一下吧。

    这次我们开启7个线程

    接着看一下数据库中的库存是否为0了

    OK,也没问题,那看一下请求的返回信息是啥呢?

    可以看到7次都成功的返回了,有同学会问了,不是应该前五个成功返回,后两个返回秒杀失败吗?这里就要跟各位同学解释一下了,因为这7个请求是并发的,所以在进入队列前就是一个请求,在出队时才会执行七次逻辑,咱们还是用图片说吧,好理解些,看下图

    所以控制台应该是前5个请求为成功秒杀到商品,后俩个请求抛出两次异常 ,我们看一下是不是这样呢?

    OK,没问题,确实是按照这样的逻辑执行的。那么在库存为0的情况下,再次并发请求会怎么样呢?看下图

    这次我还启用5个线程同时请求

    这次是service层中的方法抛出了异常,是因为在入队前也做了检查库存的操作

    最后我们在看下这次给我们返回了什么信息,是异常还是跟上次一样返回成功信息?

    OKOKOK,这次是返回了库存不足的异常信息,好了一切正常,就先写到这里吧,已经超过下班时间了。。。
    写在最后:

    这次写的很着急,所有可能有点潦草,如果有哪里不是很明白的同学,欢迎在下方回复,如果有大佬看到有不对的地方也欢迎在下方批评指正。那就这样吧,人家都下班了- -!

    点赞 7
    收藏
    分享
    

    ————————————————
    版权声明:本文为CSDN博主「、那些」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/zhangli_wei0403/article/details/84940636

    相关文章

      网友评论

          本文标题:(转载)spring-boot+mysql+Redis+rabb

          本文链接:https://www.haomeiwen.com/subject/fgdnthtx.html