美文网首页技术Code
使用redis结合aop提取用户的行为数据

使用redis结合aop提取用户的行为数据

作者: Briseis | 来源:发表于2018-09-16 18:00 被阅读17次

    场景描述

    在项目中有这样一个需求,用户下载app并打开进入首页之前,会让用户选择自己感兴趣的主题分类,后台根据用户的选择提取出用户的行为数据并作出统计,以图形或者表格的形式展现出来,后期就可以根据这些数据做去做一些类似个性化或精准化的推送了,例如在进入豆瓣app首页前提示用户先选择感兴趣的主题内容.


    image.png

    具体实现

    由于项目中主要使用了Spring框架,所以首先想到的最简单的方式就是使用aop切面拦截用户选择主题分类的接口,然后在数据库中建一张单独的表t_user_interest来存储用户的行为数据,由于是统计型数据所以实时性要求不高,允许有一定的延迟,并且为避免频繁的操作数据库,在切面拦截数据后起独立的线程推送到redis队列,后台再起调度任务每隔1小时异步持久化到数据库中.下面来看看实现步骤:
    1.定义aop拦截器拦截方法调用,把用户行为数据推送到redis队列中,其中TaskPo为封装好的与用户相关的行为数据对象.

    /**
     * aop拦截器拦截方法调用
     */
    @Component
    @Aspect
    public class UserInterestAspect {
    
        private final static Logger logger = LoggerFactory.getLogger(UserInterestAspect.class);
    
        private final LinkedBlockingQueue<TaskPo> queue = new LinkedBlockingQueue<>();
    
        private final static ExecutorService pool = Executors.newFixedThreadPool(5);
    
        private final static String QUEUE_NAME = "USER_INTEREST";
    
        /**
         * 前置通知:在目标方法开始之前执行
         * 
         * @param joinPoint
         * @throws Exception
         */
        @Before("execution(* xxx.service.impl.xxxServiceImpl.xxx(..))")
        public void saveRequiresLog(JoinPoint joinPoint) throws Exception {
            try {
                TaskPo taskPo = getTaskPo(joinPoint);
                if (taskPo != null) {
                    queue.add(taskPo);
                    TaskPo taskMsge;
                    while ((taskMsge = queue.poll()) != null) {
                        pool.execute(new PushRedisWorker(QUEUE_NAME, taskMsge));
                    }
                }
            } catch (Exception e) {
                throw e;
            }
        }
    
        /**
         * 解析参数映射为Java对象
         * 
         * @param joinPoint
         * @return
         */
        public TaskPo getTaskPo(JoinPoint joinPoint) {
            Object[] paramsValue = joinPoint.getArgs();
            String[] paramsName = ((CodeSignature) joinPoint.getStaticPart()
                    .getSignature()).getParameterNames();
            JSONObject entityJSON = new JSONObject();
            for (int i = 0; i < paramsName.length; i++) {
                entityJSON.put(paramsName[i], paramsValue[i]);
            }
            if (entityJSON != null && !entityJSON.isEmpty()) {
                TaskPo taskPo = JSONObject.toJavaObject(entityJSON, TaskPo.class);
                taskPo.setOperatorDate(new Date());
                return taskPo;
            }
            return null;
        }
    }
    

    2.接着起一个任务线程接口worker继承Runnable.

    public interface Worker extends Runnable {
    
    }
    

    3.然后创建一个PushRedisWorker类实现Worker接口,并将封装好的数据写到redis队列中.

    public class PushRedisWorker implements Worker {
    
        private String QUEUE_NAME;
    
        private TaskPo taskPo;
    
        public PushRedisWorker(String QUEUE_NAME, TaskPo taskPo) {
            this.QUEUE_NAME = QUEUE_NAME;
            this.taskPo = taskPo;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try (Jedis jedis = JedisUtils.getJedis()) {
                jedis.lpush(QUEUE_NAME, JSON.toJSONString(taskPo));
            }
        }
    }
    

    4.然后再定义一个调度任务器类每隔一段时间间隔轮询redis任务队列,不断的从队列中消费数据,再结合线程池异步发送数据到mysql中.

    /**
     * 任务调度定时到Redis队列中拉取对象持久化到数据库
     */
    @Component
    public class RedisTaskJob {
    
        private final static Logger logger = LoggerFactory.getLogger(RedisTaskJob.class);
    
        private final static String QUEUE_NAME = "USER_INTEREST";
    
        private final static ExecutorService es = Executors.newFixedThreadPool(5);
    
        private volatile static boolean isRun = true;
    
        /**
         * 每隔一小时执行一次
         */
        @Scheduled(cron = "0 0 0/1 * * ? ")
        public void getRedisTask() {
            if (logger.isDebugEnabled()) {
                logger.debug("调度开始");
            }
                    //取出spring受管的业务service实例
            IUserInterestService userInterestService = SpringContextHolder.getBean("userInterestService",IUserInterestService.class);
            try (Jedis jedis = JedisUtils.getJedis()) {
                if (jedis.exists(QUEUE_NAME)) {
                    start();
                }
                while (isRun) {
                    if (!jedis.exists(QUEUE_NAME)) {
                        stop();
                        break;
                    }
                    try {
                        String task = jedis.lpop(QUEUE_NAME);
                        if (StringHelpUtils.isNotBlank(task)) {
                            TaskPo taskPo = JSONObject.toJavaObject(JSON.parseObject(task), TaskPo.class);
                                                    //任务对象转化为实体model
                            UserInterest model = EntityUtils.convert(taskPo,UserInterest.class);
                            es.submit(() -> {
                                                             //执行数据入库操作
                                userInterestService.save(model);
                            });
                        }
                    } catch (Exception e) {
                        stop();
                        throw e;
                    }
                }
            }
        }
    
        public static void stop() {
            isRun = false;
        }
    
        public static void start() {
            isRun = true;
        }
    }
    

    5.最后统计mysql中的数据并生成报表.

    相关文章

      网友评论

        本文标题:使用redis结合aop提取用户的行为数据

        本文链接:https://www.haomeiwen.com/subject/fokiuftx.html