美文网首页java 设计
基于Redis集群的通用缓存架构--项目介绍

基于Redis集群的通用缓存架构--项目介绍

作者: 十六线程序员 | 来源:发表于2020-01-18 13:47 被阅读0次

    写这篇博客的目的呢就是想好好总结并做一个介绍,如果哪里说不好或者有误恳请大家指点,谢谢...
    项目地址https://github.com/XMUTLZY/api-cache

    Redis环境

    这里主要使用cluster模式搭建,楼主去阿里云买了台学生机,搭了个3个主节点和3个从节点的集群环境,这里就不具体介绍了,后续可以考虑好好总结下这个环境搭建~

    主项目介绍

    项目结构

    项目采用springboot+maven搭建


    项目结构.png
    • constants包:存放一些项目用到的常量
    • document包:存放和Mongodb映射起来的实体类
    • http包: http包.png

      如图,包括request、response和vo实体

    • repository包:存放直接和Redis进行操作的类
    • service包:业务层,具体业务都在这里处理了
    • web包:
      annotation包——>存放自定义的注解
      aop包——>利用Spring AOP特性,定义具体的aop实现类
      config包——>系统用到的一些配置,比如Redis、Mongodb等
      interceptor包——>拦截器配置
      utils包——>存放一些方便开发的方法,一般就是一些经常要用到的方法给它抽出来
    运行效果

    二话不说,先试下运行效果。我们知道Redis支持五大数据类型,我们以String类型为例,执行put操作即增加缓存数据。


    运行.png

    这里传入三个参数,其中member表示项目的唯一表示,因为既然要实现一个通用的缓存架构,那么就要有个字段来标识不同项目,避免key冲突;再传入一个键值对即可。再去实际环境测试下是否成功,结果如下图。


    测试.png

    注意到这里实际的key值并不是我们传入的参数key,而是member:key,这样做的目的我们看个图你们就清楚了

    redis数据缓存结构.png
    这个就是redis缓存架构的思想,我们只需要调用该接口并传入指定参数就可;之后就是在代码复用性和优化上下功夫了。接下来看下具体代码实现和一些细节处理~
    代码实现

    SpringBoot是传统的MVC模式开发框架,这次使用String类型的获取缓存数据这个方法。
    1、Controller

    @Controller
    @RequestMapping(value = "/cache/string")
    public class StringCacheController {
        @Autowired
        private StringCacheService stringCacheService;
    
        /**
         * 获取缓存数据
         * @Params: member、key
         */
        @RequestMapping(value = "/get", method = RequestMethod.POST)
        @ResponseBody
        @KeyRequired
        public BaseResponse stringGet(@RequestBody CacheRequest cacheRequest) {
            return stringCacheService.get(cacheRequest);
        }
    }
    

    这里自定义了@KeyRequired注解,用于判断输入的member(用于区分不同项目)和key字段是否符合要求,如下图。

    @Target({ElementType.METHOD})//标识该注解标记在方法上
    @Retention(RetentionPolicy.RUNTIME)//运行时注解
    public @interface KeyRequired {
    }
    

    定义好注解后,需要配合拦截器进行处理,首先添加拦截器。

    @Configuration
    public class InterceptorConfig implements WebMvcConfigurer {
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(keyCheckInterceptor());
        }
    
        @Bean
        public KeyCheckInterceptor keyCheckInterceptor() {
            return new KeyCheckInterceptor();
        }
    }
    
    /**
     * Created by Jake.lin on 2019/12/09
     * @Tips: 判断key值是否存在  拦截器
     */
    public class KeyCheckInterceptor extends HandlerInterceptorAdapter {
        @Resource
        private KeyService keyService;
        public static final Integer MEMBER_BE_NULL = 431;
        public static final Integer KEY_BE_NULL = 432;
        public static final Integer KEY_NO_EXISTS = 433;
        public static final String CHARSET_ENCODING = "UTF-8";
    
        /*
         * 注意:这里我们使用了拦截器对请求进行处理已经获取到请求体,后续就会出现request body miss的情况
         */
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            if (!(handler instanceof HandlerMethod)) { // 如果注解不标记在方法上,则不进行拦截
                return true;
            }
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Method method = handlerMethod.getMethod();
            if (method.getAnnotation(KeyRequired.class) != null) {//判断方法上是否有该注解
                JSONObject jsonObject = SystemUtils.getRequestBody(request);// 这一步目的就是为了获取请求体
                String member = (String) jsonObject.get("member");
                String key = (String) jsonObject.get("key");
                if (!StringUtils.hasText(member)) {//member为空
                    buildHttpServletResponse(response, MEMBER_BE_NULL, "{\"status\":" + MEMBER_BE_NULL + ",\n\"message\":\"the member can't no be null.\"}");
                    return false;
                } else if (!StringUtils.hasText(key)) {//key为空
                    buildHttpServletResponse(response, KEY_BE_NULL, "{\"status\":" + KEY_BE_NULL + ",\n\"message\":\"the key can't no be null.\"}");
                    return false;
                } else {//(member:key)不存在
                    List<String> keyList = Arrays.asList(SystemUtils.buildKey(member, key));
                    CacheRequest cacheRequest = new CacheRequest();
                    cacheRequest.setMemberKeyList(keyList);
                    if (!keyService.isExistsByKeyList(cacheRequest).get(keyList.get(0))) {//这里做这么多的目的就是想知道该(member:key)是否存在
                        buildHttpServletResponse(response, KEY_NO_EXISTS, "{\"status\":" + KEY_NO_EXISTS + ",\n\"message\":\"the key(member+key) no exist.\"}");
                        return false;
                    }
                }
            }
            return true;
        }
    
        private void buildHttpServletResponse(HttpServletResponse response, Integer statusCode, String message) throws IOException {
            response.setContentType(MediaType.APPLICATION_JSON_VALUE);
            response.setCharacterEncoding(CHARSET_ENCODING);
            response.setStatus(statusCode);
            response.getWriter().write(message);
            response.getWriter().close();
        }
    }
    

    上面代码就通过注解实现了对member和key进行处理的逻辑,其中getRequestBody()和buildKey()是自己定义的方法

    public class SystemUtils {
        /**
         * @Tips: convert key to standard format
         */
        public static String buildKey(String member, String key) {
            return member + ":" + key;
        }
    
        /**
         * @Iips: build error response
         */
        public static void buildErrorResponse(BaseResponse baseResponse) {
            baseResponse.setStatus(BaseResponse.FAILD_STATUS);
            baseResponse.setStatusCode(BaseResponse.FAILD_CODE);
            baseResponse.setMessage("system error." + SystemUtils.dateToFormat(new Date()));
        }
    
        /**
         * @Tips: get the request body by HttpServletRequest
         */
        public static JSONObject getRequestBody(HttpServletRequest request) throws IOException {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(request.getInputStream()));
            String str = "";
            String wholeStr = "";
            while ((str=bufferedReader.readLine()) != null) {
                wholeStr += str;
            }
            return JSONObject.parseObject(wholeStr);
        }
    }
    

    2、Service

    @Service
    public class StringCacheService {
        @Autowired
        private StringCacheRepository stringCacheRepository;
    
        @Override
        public CacheResponse get(CacheRequest cacheRequest) {
            CacheResponse cacheResponse = stringCacheRepository.get(cacheRequest);
            if (!StringUtils.hasText(cacheResponse.getValue())) {
                cacheResponse.setMessage("no find value.");
            }
            return cacheResponse;
        }
    }
    

    3、Repository
    这里直接和Redis操作,来看代码。

    @Repository
    public class StringCacheRepository {
        @Autowired
        private JedisCluster jedisCluster;
    
        public CacheResponse get(CacheRequest cacheRequest) {
            CacheResponse cacheResponse = new CacheResponse();
            try {
                Object value = jedisCluster.get(SystemUtils.buildKey(cacheRequest.getMember(), cacheRequest.getKey()));
                cacheResponse.setValue(value.toString());
            } catch (Exception e) {
                SystemUtils.buildErrorResponse(cacheResponse);
            }
            return cacheResponse;
        }
    }
    

    这里引入了JedisCluster类,那肯定要先配置和加载该类

    @Configuration
    @ConditionalOnClass({JedisCluster.class})
    public class RedisConfig {
        @Value("${spring.redis.cluster.nodes}")
        private String clusterNodes;
        @Value("${spring.redis.timeout}")
        private int timeout;
        @Value("${spring.redis.jedis.pool.max-idle}")
        private int maxIdle;
        @Value("${spring.redis.jedis.pool.max-wait}")
        private int maxWaitMillis;
    
        @Bean
        public JedisCluster getJedisCluster() {
            String[] cNodes = clusterNodes.split(",");
            Set<HostAndPort> nodes = new HashSet<>();
            for(String node : cNodes) {
                String[] hp = node.split(":");
                nodes.add(new HostAndPort(hp[0], Integer.parseInt(hp[1])));
            }
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxIdle(maxIdle);
            jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
            return new JedisCluster(nodes, jedisPoolConfig);
        }
    }
    

    上面就是Redis支持的String类型的其中一个操作,Redis支持五大数据类型(String、List、Set、Hash和ZSort(有序集合)),对应我们就得有五个Controller进行处理,还有对Key值操作也需要新建一个Controller,更多操作可以看Jedis的APl https://blog.csdn.net/zhangguanghui002/article/details/78770071

    功能优化

    1、piplined管道功能
    Redis cluster集群模式并不支持管道模式

    image.png
    个人理解:从代码角度来说,Piplined对象是从JedisPool中获取的,Cluster模式多少个主节点就有多少个JedisPool对象,存入的key值并不能保证都是存在于同一个节点中,因此无法实现管道功能。
    针对这个原因,思考下,能否对属于同一个节点的key使用同一个Piplined对象,三个节点我们就需要获取三个Piplined对象。但是由于JedisCluster并没有把每个主节点对应的JedisPool对象暴露给我们,我们也就不能获取到Piplined对象了。这里的解决方案我参考了https://www.jianshu.com/p/54a754c85f81这位大神的写法,讲得很清楚。
    2、Mongdb+AOP监控数据请求
    先写个Service来做记录服务
    @Service
    public class RecordService {
        @Autowired
        private MongoTemplate mongoTemplate;
    
        @Transactional
        public RedisLogDoc insert(RecordRequest recordRequest) {
            RedisLogDoc redisLogDoc = new RedisLogDoc(); 
            BeanUtils.copyProperties(recordRequest, redisLogDoc);
            return mongoTemplate.save(redisLogDoc);
        }
    }
    

    再就是定义aop实现请求监控了

    @Component
    @Aspect
    public class RecordControllerRequestAop {
        @Autowired
        private RecordService recordService;
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Before("pointCut()")//切入点之前
        public void Before() {
            logger.info("before controller init");
        }
    
        @Pointcut("execution(* sch.xmut.jake.cache.apicache.web.controller..*.*(..))")//切入点
        public void pointCut() {
            logger.info("pointCut controller init");
        }
    
        @AfterReturning(returning = "response", value = "pointCut()")//执行完切入点之后
        public void afterRunning(JoinPoint joinPoint, Object response) {
            logger.info("afterRunning init");
            BaseResponse baseResponse = (BaseResponse) response; // 切入点方法的返回值
            CacheRequest cacheRequest = (CacheRequest) joinPoint.getArgs()[0];// request参数,根据实际需求写
            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
            RecordRequest recordRequest = buildRecordReuqest(request, cacheRequest);// build一个参数传入记录的service
            recordRequest.setMethod(joinPoint.getSignature().getName());
            if (BaseResponse.SUCCESS_CODE == baseResponse.getStatusCode()) {
                recordService.insert(recordRequest);
            } else {
                logger.warn("response error, skip controller record");
            }
            logger.info("complete controller record");
        }
    
        private RecordRequest buildRecordReuqest(HttpServletRequest request, CacheRequest cacheRequest) {
            RecordRequest recordRequest = new RecordRequest();
            recordRequest.setUrl(request.getRequestURL().toString());
            recordRequest.setContentType(request.getContentType());
            recordRequest.setMethodType(request.getMethod());
            recordRequest.setParams(JSONObject.toJSONString(cacheRequest));
            recordRequest.setProjectMember(cacheRequest.getMember());
            recordRequest.setRecordType(CacheConstans.RECORD_TYPE_CONTROLLER);
            recordRequest.setCreateTime(SystemUtils.dateToFormat(new Date()));
            return recordRequest;
        }
    }
    

    效果如下


    Mongodb.png

    其中record_type字段是后续加上去的,因为当我使用Dubbo+Zookeeper作为调用框架之后,需要把service暴露出来,这时候对该项目的调用就只是实现对service的调用了,并不会走接口。传统的Rest或者httpClient调用还是从Controller作为入口。

    3、Dubbo+Zookeeper集成
    环境搭建:我也只是简单的在服务器上搭建了单机的环境,先将Zookeeper部署下来,再去获取Dubbon-admin的war包,记得修改Dubbo的配置,把注册中心改为Zookeeper的服务器地址,在把war包放入tomcat,启动之后就可以了。在这个过程中,楼主遇到了很多问题...比如zookeeper启动成功,但是状态显示缺提示没启动,后来发现是8080端口被占用了...具体搭建楼主也没有做研究,就只是了解~~后续还需加强学习...

    搭建好了先引入两个包

    <!-- zookeeper客户端 -->
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.7</version>
                <exclusions><!-- 包冲突 -->
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.gitee.reger/spring-boot-starter-dubbo -->
            <dependency>
                <groupId>com.gitee.reger</groupId>
                <artifactId>spring-boot-starter-dubbo</artifactId>
                <version>1.1.3</version>
            </dependency>
    

    dubbo和zookeeper配置

    #dubbo+zookeeper
    spring.dubbo.application.name=api-cache-provider
    spring.dubbo.base-package=sch.xmut.jake.cache.apicache.service //需要暴露的服务所在的包
    spring.dubbo.registry.address=your ip
    spring.dubbo.registry.port=2181
    spring.dubbo.protocol.name=dubbo
    spring.dubbo.protocol.port=20890
    spring.dubbo.provider.timeout=5000
    

    最后再把service包下的@Service注解改成Dubbo包下的注解

    import com.alibaba.dubbo.config.annotation.Service;
    

    总结

    楼主是个小白实习生...写这篇博客只是做个总结,你们可以以怀疑的态度来看这篇博客,中间可能会有哪里说法不对或者写的有误,我不一定都是对的,请大家指教... 后续我会对这次用到的技术进行系统的学习,谢谢~

    相关文章

      网友评论

        本文标题:基于Redis集群的通用缓存架构--项目介绍

        本文链接:https://www.haomeiwen.com/subject/mdchzctx.html