美文网首页技术分享
使用Redis+Lua脚本实现抢红包并异步持久化到数据库

使用Redis+Lua脚本实现抢红包并异步持久化到数据库

作者: isuntong | 来源:发表于2019-11-08 15:03 被阅读0次

概述

上面三篇是使用的MySql数据库来作为数据的载体数据最终会将数据保存到磁盘中,而Redis使用的是内存,内存的速度比磁盘速度肯定要快很多.

对于使用 Redis实现抢红包,首先需要知道的是Redis的功能不如数据库强大,事务也不是很完整.因此要保证数据的正确性数据的正确性可以通过严格的验证得以保证。

而 Redis的 Lua 语言是原子性的,且功能更为强大,所以优先选择使用Lua语言来实现抢红包。

但是无论如何对于数据而言,在 Redis 当中存储,始终都不是长久之计 , 因为 Redis并非一个长久储存数据的地方,更多的时候只是为了提供更为快速的缓存,所以当红包金额为 0 或者红包超时的时候(超时操作可以使用定时机制实,这里暂不讨论), 会将红包数据保存到数据库中, 这样才能够保证数据的安全性和严格性。

所以这次我们将使用Redis + lua脚本来实现抢红包的功能。

实现步骤

xml方式配置redis

applicationContext-redis.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd 
        http://www.springframework.org/schema/mvc 
        http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd 
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.2.xsd 
        http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.2.xsd 
        http://www.springframework.org/schema/tx 
        http://www.springframework.org/schema/tx/spring-tx-3.2.xsd ">

<!-- 加载配置文件 -->
    <!-- redis连接池配置-->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig" >
        <!--最大空闲数
        <property name="maxIdle" value="${redis.maxIdlesuntong}" />-->
        <!--连接池的最大数据库连接数  -->
        <property name="maxTotal" value="${redis.maxTotal}" />
        <!--最大建立连接等待时间-->
        <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
    </bean >


    <!--redis连接工厂 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
        <property name="poolConfig" ref="jedisPoolConfig"></property>
        <!--IP地址 -->
        <property name="hostName" value="${redis.hostName}"></property>
        <!--端口号  -->
        <property name="port" value="${redis.port}"></property>
        <!--如果Redis设置有密码,没有的话就注释掉下面的配置  
        <property name="password" value="${redis.password}" /> -->
    </bean>

    <!--redis操作模版,使用该对象可以操作redis  -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" >
        <property name="connectionFactory" ref="jedisConnectionFactory" />
        <!--如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!!  -->
        <property name="keySerializer" >
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <property name="valueSerializer" >
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <property name="hashKeySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
        </property>
        <property name="hashValueSerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
        </property>
        <!--开启事务  -->
        <property name="enableTransactionSupport" value="true"></property>
    </bean >
    

</beans>

注意这里我们使用StringRedisSerializer(简单的字符串序列化)序列化方式处理value,因为我们把数据以字符串形式存到了redis的linkedlist中,开始我将其配置为了GenericJackson2JsonRedisSerializer(类似Jackson2JsonRedisSerializer,但使用时构造函数不用特定的类参考以上序列化,自定义序列化类; ),但是java代码中获取list之后其中的json值无法解析,整整搞了两天才发现错误地方。

同时我们导入properties文件要和dao中导入的properties写在一起,不然无法导入

<context:property-placeholder location="classpath:db.properties,classpath:redis.properties" ignore-unresolvable="true" />

redis.properties

#ip地址
redis.hostName=192.168.79.128
#端口号
redis.port=6379
#如果有密码
redis.password=123456  


#最大空闲数
redis.maxIdlesuntong=50  
#控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性
redis.maxTotal=100  
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
redis.maxWaitMillis=1000  

没有写出所有配置,有的使用默认即可

application-async.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd 
        http://www.springframework.org/schema/mvc 
        http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd 
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.2.xsd 
        http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.2.xsd 
        http://www.springframework.org/schema/tx 
        http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
        http://www.springframework.org/schema/task 
        http://www.springframework.org/schema/task/spring-task.xsd">

    <task:annotation-driven executor="myExecutor"/>
    <task:executor id="myExecutor" pool-size="5-10" queue-capacity="200" keep-alive="180" rejection-policy="ABORT"/>

</beans>

补充方式二:注解方式配置redis

首先在类 RootConfig 上创建一个 RedisTemplate 对象,并将其装载到 Spring IoC 容器中。

/**
     * 创建一个 RedisTemplate 对象
     */
    @Bean(name = "redisTemplate")
    public RedisTemplate initRedisTemplate() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        // 最大空闲数
        poolConfig.setMaxIdle(50);
        // 最大连接数
        poolConfig.setMaxTotal(100);
        // 最大等待毫秒数
        poolConfig.setMaxWaitMillis(20000);
        // 创建Jedis链接工厂
        JedisConnectionFactory connectionFactory = new JedisConnectionFactory(poolConfig);
        connectionFactory.setHostName("192.168.31.66");
        connectionFactory.setPort(6379);
        // 调用后初始化方法,没有它将抛出异常
        connectionFactory.afterPropertiesSet();
        // 自定Redis序列化器
        RedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
        RedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // 定义RedisTemplate,并设置连接工厂
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(connectionFactory);
        // 设置序列化器
        redisTemplate.setDefaultSerializer(stringRedisSerializer);
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(stringRedisSerializer);
        return redisTemplate;
    }

这样 RedisTemplate 就可以在 Spring 上下文中使用了。
注意, JedisConnectionFactory对象在最后的时候需要自行调用 afterPropertiesSet 方法,它实现了 lnitializingBean 接 口。 如果将其配置在 Spring IoC 容器中, Spring 会自动调用它,但是这里我们是自行创建的, 因此需要自行调用,否则在运用的时候会抛出异常。

lua脚本和异步持久化功能的开发

Redis 并不是一个严格的事务,而且事务的功能也是有限的 。 加上 Redis 本身的命令也比较有限,功能性不强,为了增强功能性,还可以使用 Lua 语言。
Redis 中的 Lua 语言是一种原子性的操作,可以保证数据的一致性 。

依据这个原理可以避免超发现象,完成抢红包的功能,而且对于性能而言, Redis 会比数据库快得多。

第一次运行 Lua 脚本的时候,先在 Redis 中编译和缓存脚本,这样就可以得到一个 SHA1字符串,之后通过 SHAl 字符串和参数就能调用 Lua 脚本了

--缓存抢红包列表信息列表 key
local listKey = 'red_packet_list_'..KEYS[1]  
--当前被抢红包 key
local redPacket = 'red_packet_'..KEYS[1] 
--获取当前红包库存
local stock = tonumber(redis.call('hget', redPacket, 'stock')) 
--没有库存,返回为 0 
if stock <= 0 then 
    return 0 
end 
--库存减 1
stock = stock-1
--保存当前库存
redis.call('hset', redPacket, 'stock', tostring(stock)) 
--往链表中加入当前红包信息
redis.call('rpush', listKey, ARGV[1])  
--如果是最后一个红包,则返回 2 ,表示抢红包已经结束,需要将列表中的数据保存到数据库中
if stock == 0 then 
    return 2 
end  
--如果并非最后一个红包,则返回 l ,表示抢红包成功
return 1

流程:

  • 判断是否存在可抢的库存,如果己经没有可抢夺 的红包,则返回为 0,结束流程
  • 有可抢夺的红包,对于红包的库存减1 ,然后重新设置库存
  • 将抢红包数据保存到 Redis 的链表当中,链表的 key 为 red_packet_list_ {id}
  • 如果当前库存为 0 ,那么返回 2,这说明可以触发数据库对 Redis 链表数据的保存,链表的 key 为 red_packet_ list_ {id},它将保存抢红包的用户名和抢的时间
  • 如果当前库存不为 0 ,那么将返回 1,这说明抢红包信息保存成功。

当返回为 2 的时候,说明红包己经没有库存,会触发数据库对链表数据的保存, 这是一个大数据量的保存。为了不影响最后一次抢红包的响应,在实际的操作中往往会考虑使用 JMS 消息发送到别的服务器进行操作,我们这里选择一种简单的方式来实现,去创建一条新的线程去运行保存 Redis 链表数据到数据库。

Service层

RedisRedPacketService.java

package com.redpacket.ssm.service;

public interface RedisRedPacketService {

    /**
     * 保存redis抢红包列表
     * @param redPacketId --抢红包编号
     * @param unitAmount -- 红包金额
     */
    public void saveUserRedPacketByRedis(Integer redPacketId, Double unitAmount);
}

RedisRedPacketServiceImpl.java

package com.redpacket.ssm.service.impl;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.redpacket.ssm.po.TUserRedPacket;
import com.redpacket.ssm.service.RedisRedPacketService;

public class RedisRedPacketServiceImpl implements RedisRedPacketService {

    private static final String PREFIX = "red_packet_list_";
    // 每次取出1000条,避免一次取出消耗太多内存
    private static final int TIME_SIZE = 1000;

    
    @Autowired
    private RedisTemplate<String, String> redisTemplate; // RedisTemplate

    @Autowired
    private DataSource dataSource; // 数据源
    
    @Override
    // 开启新线程运行
    @Async
    public void saveUserRedPacketByRedis(Integer redPacketId, Double unitAmount) {
        
        // TODO 自动生成的方法存根
        System.err.println("开始保存数据");
        Long start = System.currentTimeMillis();
        // 获取列表操作
        BoundListOperations<String,String> ops = redisTemplate.boundListOps(PREFIX + redPacketId);
        
        Long size = ops.size();
        Long times = size % TIME_SIZE == 0 ? size / TIME_SIZE : size / TIME_SIZE + 1;
        int count = 0;
        List<TUserRedPacket> userRedPacketList = new ArrayList<TUserRedPacket>(TIME_SIZE);

        for (int i = 0; i < times; i++) {
            // 获取至多TIME_SIZE个抢红包信息
            List<String> userIdList = null;
            if (i == 0) {
                userIdList = ops.range(i * TIME_SIZE, (i + 1) * TIME_SIZE);
            } else {
                userIdList = ops.range(i * TIME_SIZE + 1, (i + 1) * TIME_SIZE);
            }
            userRedPacketList.clear();
            // 保存红包信息
            for (int j = 0; j < userIdList.size(); j++) {
                String args = userIdList.get(j).toString();
                String[] arr = args.split(" ");
                String userIdStr = arr[0];
                String timeStr = arr[1];
                Integer userId = Integer.parseInt(userIdStr);
                Long time = Long.parseLong(timeStr);
                // 生成抢红包信息
                TUserRedPacket userRedPacket = new TUserRedPacket();
                userRedPacket.setRedPacketId(redPacketId);
                userRedPacket.setUserId(userId);
                userRedPacket.setAmount(new BigDecimal(unitAmount));
                userRedPacket.setGrabTime(new Timestamp(time));
                userRedPacket.setNote("grapRedPacket " + redPacketId);
                userRedPacketList.add(userRedPacket);
            }
            // 插入抢红包信息
            count += executeBatch(userRedPacketList);
        }
        // 删除Redis列表
        redisTemplate.delete(PREFIX + redPacketId);
        Long end = System.currentTimeMillis();
        System.err.println("保存数据结束,耗时" + (end - start) + "毫秒,共" + count + "条记录被保存。");
        
    }
    
    /**
     * 使用JDBC批量处理Redis缓存数据.
     * 
     * @param userRedPacketList
     *            -- 抢红包列表
     * @return 抢红包插入数量.
     */
    private int executeBatch(List<TUserRedPacket> userRedPacketList) {
        Connection conn = null;
        Statement stmt = null;
        int[] count = null;
        try {
            conn = dataSource.getConnection();
            conn.setAutoCommit(false);
            stmt = conn.createStatement();
            for (TUserRedPacket userRedPacket : userRedPacketList) {
                String sql1 = "update T_RED_PACKET set stock = stock-1 where id=" + userRedPacket.getRedPacketId();
                DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String sql2 = "insert into T_USER_RED_PACKET(red_packet_id, user_id, " + "amount, grab_time, note)"
                        + " values (" + userRedPacket.getRedPacketId() + ", " + userRedPacket.getUserId() + ", "
                        + userRedPacket.getAmount() + "," + "'" + df.format(userRedPacket.getGrabTime()) + "'," + "'"
                        + userRedPacket.getNote() + "')";
                stmt.addBatch(sql1);
                stmt.addBatch(sql2);
            }
            // 执行批量
            count = stmt.executeBatch();
            // 提交事务
            conn.commit();
        } catch (SQLException e) {
            /********* 错误处理逻辑 ********/
            throw new RuntimeException("抢红包批量执行程序错误");
        } finally {
            try {
                if (conn != null && !conn.isClosed()) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        // 返回插入抢红包数据记录
        return count.length / 2;
    }

}

UserRedPacketServiceImpl.java

新增方法

@Autowired
        private RedisTemplate redisTemplate;

        @Autowired
        private RedisRedPacketService redisRedPacketService;

        // Lua脚本
        String script = "local listKey = 'red_packet_list_'..KEYS[1] \n" 
                + "local redPacket = 'red_packet_'..KEYS[1] \n"
                + "local stock = tonumber(redis.call('hget', redPacket, 'stock')) \n" 
                + "if stock <= 0 then return 0 end \n"
                + "stock = stock -1 \n" 
                + "redis.call('hset', redPacket, 'stock', tostring(stock)) \n"
                + "redis.call('rpush', listKey, ARGV[1]) \n" 
                + "if stock == 0 then return 2 end \n" 
                + "return 1 \n";

        // 在缓存LUA脚本后,使用该变量保存Redis返回的32位的SHA1编码,使用它去执行缓存的LUA脚本[加入这句话]
        String sha1 = null;
        
        @Override
        public Long grapRedPacketByRedis(Integer redPacketId, Integer userId) {
            
                // 当前抢红包用户和日期信息
                String args = userId + " " + System.currentTimeMillis();
                long result = 0;
                // 获取底层Redis操作对象
                Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();
                try {
                    // 如果脚本没有加载过,那么进行加载,这样就会返回一个sha1编码
                    if (sha1 == null) {
                        sha1 = jedis.scriptLoad(script);
                    }
                    // 执行脚本,返回结果
                    Object res = jedis.evalsha(sha1, 1, redPacketId + "", args);
                    result = (Long) res;
                    // 返回2时为最后一个红包,此时将抢红包信息通过异步保存到数据库中
                    if (result == 2) {
                        // 获取单个小红包金额
                        String unitAmountStr = jedis.hget("red_packet_" + redPacketId, "unit_amount");
                        // 触发保存数据库操作
                        Double unitAmount = Double.parseDouble(unitAmountStr);
                        
                        redisRedPacketService.saveUserRedPacketByRedis(redPacketId, unitAmount);
                    }
                } finally {
                    // 确保jedis顺利关闭
                    if (jedis != null && jedis.isConnected()) {
                        jedis.close();
                    }
                }
                return result;
            
        }

注解@Async 表示让 Spring 自动创建另外一条线程去运行它,这样它便不在抢最后一个红包的线程之内。因为这个方法是一个较长时间的方法,如果在同一个线程内,那么对于最后抢红包的用户需要等待的时间太长,用户体验不好
这里是每次取出 1 000 个抢红包的信息,之所以这样做是为了避免取出 的数据过大 , 导致JVM 消耗过多的内存影响系统性能。

对于大批量的数据操作,这是我们在实际操作中要注意的,最后还会删除 Redis保存的链表信息,这样就帮助 Redis 释放内存了

对于数据库的保存 ,这里采用了 JDBC的批量处理,每 1000 条批量保存一次,使用批量有助于性能的提高。

其xml配置在上面已挂出,如果使用注解配置的话这个时候要在原有的基础上改写配置类 WebConfig

@EnableAsync
public class WebConfig extends AsyncConfigurerSupport { 
    ....
    ....
    ....
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

使用@EnableAsync 表明支持异步调用,而我们实现了接口 AsyncConfigurerSupport 的getAsyncExecutor 方法,它是获取一个任务池,当在 Spring 环境中遇到注解@Async就会启动这个任务池的一条线程去运行对应的方法,这样便能执行异步了。

我们还使用了保存脚本返回 的 SHAl 字符串 ,所以只会发送一次脚本到 Redis 服务器,之后只传输 SHAl 字符串和参数到 Redis 就能执行脚本 了, 当脚本返回为 2 的时候, 表示此时所有的红包都已经被抢光了 ,那么就会触发 redisRedPacketService 的 saveUserRedPacketByRedis 方法。由于在 saveU serRedPacketByRedis 加入注解@Async , 所以 Spring 会创建一条新的线程去运行它 , 这样就不会影响最后抢一个红包用户 的响应时间了 。

Controller层新增路由方法

@RequestMapping(value = "/grapRedPacketByRedis")
    @ResponseBody
    public Map<String, Object> grapRedPacketByRedis(Integer redPacketId, Integer userId) {
        Map<String, Object> resultMap = new HashMap<String, Object>();
        long result = userRedPacketService.grapRedPacketByRedis(redPacketId, userId);
        boolean flag = result > 0;
        resultMap.put("result", flag);
        resultMap.put("message", flag ? "抢红包成功" : "抢红包失败");
        return resultMap;
    }

启动linux上的redis

cd /usr/local/redis
./bin/redis-server ./redis.conf
./bin/redis-cli

构造模拟数据,测试

HMSET red_packet_1 stock 20000 unit_amount 10

初始化了一个编号为1 的大红包,其中库存为 2 万个,每个 10 元. 需要保证数据库的红包表内也有对应的记录才可以。

新建一个jsp

repacketByRedis.jsp

<%@ page language="java" contentType="text/html; charset=UTF-8"
         pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
        <title>参数</title>
        <!-- 加载Query文件-->
        <script type="text/javascript" src="https://code.jquery.com/jquery-3.2.0.js">
        </script>
        <script type="text/javascript">
            $(document).ready(function () {
              //模拟30000个异步请求,进行并发
              var max = 30000;
              for (var i = 1; i <= max; i++) {
                  //jQuery的post请求,请注意这是异步请求
                  $.post({
                      //请求抢id为1的红包
                      //根据自己请求修改对应的url和大红包编号
                      url: "${pageContext.request.contextPath }/userRedPacket/grapRedPacketByRedis.action?redPacketId=1&userId=" + i,
                      //成功后的方法
                      success: function (result) {
                          
                          console.log("OK")
                      }
                  });
              }
          });
        </script>
    </head>
    <body>
    
    haha
    
    </body>
</html>

测试

http://localhost:8080/ssm_redpacket/grapByRedis.jsp

直接用start.bat就不用启动eclipse了,我得电脑快炸了

结合前几篇的数据统计,使用Redis的方式数据一致性也得到了保证且性能远远高于乐观锁和悲观锁的方式。

总结

redis方法还有几个小问题没有解决,比如如果数据写入不成功,虽然会抛出异常,但是在列表中存储的记录信息不会删除,下次再尝试时会有翻倍的记录。同时redis编程难度远大于前几种方式,所以不是不用,要更努力的掌握redis精髓,写出更高质量的代码

相关文章

网友评论

    本文标题:使用Redis+Lua脚本实现抢红包并异步持久化到数据库

    本文链接:https://www.haomeiwen.com/subject/alvvbctx.html