美文网首页
AskMe项目 异步队列

AskMe项目 异步队列

作者: 当麻真实 | 来源:发表于2018-02-20 15:16 被阅读0次

    异步队列简单介绍

    队列实现异步可以用单向队列,任务放到队列中,先进先出,或者使用优先队列,按照优先级来选择谁先执行,来防止某一个用户执行大量的请求,如一个用户发送了100个请求,如果用单向队列,其它用户必须要等这个用户的100个请求结束后才能执行,这就不合理,所以可以给第2个请求设置比较低的优先级,这样其他用户的请求也可以被执行

    使用异步队列可以让主线程继续运行,减少请求响应时间和解耦,主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。

    主要编写两个类

    1. EventProducer把事件推到Redis队列中去
    2. EventConsumer把事件从队列中取出来,找到关联的handler,一件一件地去处理

    其他用到了一些辅助类

    1. 枚举型的EventType,用来表示不同的Event类型
    2. EventModel来存放Event中的相关信息
    3. 接口类EventHandler,其中有两个函数,一个是执行Handler,另一个是返回需要使用这个Handler的EventType列表

    具体实现

    1. 枚举型EventType

    public enum EventType {
        LIKE(0),
        COMMENT(1),
        LOGIN(2),
        ASK(3),
        Mail(4);
    
        private int value;
        EventType(int value)
        {
            this.value=value;
        }
        public int getValue()
        {
            return this.value;
        }
    
    }
    

    2.EventModel

    public class EventModel {
        private int userid;//事件执行者,如点赞的人
        private EventType eventype;//事件类型,如点赞
        private  int entitytype;//操作对象类型,如给评论点赞,则为评论类型
        private int entityid;//操作对象ID,如评论的ID
        private int entityownerid;//对象的拥有者,如发表该评论的用户
    
        public int getUserid() {
            return userid;
        }
    
        public EventModel setUserid(int userid) {
            this.userid = userid;
            return this;
        }
    
        public EventType getEventype() {
            return eventype;
        }
    
        public EventModel setEventype(EventType eventype) {
            this.eventype = eventype;
            return this;
        }
    
        public int getEntitytype() {
            return entitytype;
        }
    
        public EventModel setEntitytype(int entitytype) {
            this.entitytype = entitytype;
            return this;
        }
    
        public int getEntityid() {
            return entityid;
        }
    
        public EventModel setEntityid(int entityid) {
            this.entityid = entityid;
            return this;
        }
    
        public int getEntityownerid() {
            return entityownerid;
        }
    
        public EventModel setEntityownerid(int entityownerid) {
            this.entityownerid = entityownerid;
            return this;
        }
    
        public Map<String, String> getMap() {
            return map;
        }
    
        public EventModel setMap(Map<String, String> map) {
            this.map = map;
            return this;
        }
    
        private Map<String,String> map=new HashMap<String, String>();
    
        public EventModel setkeyvalue(String key,String value)
        {
            map.put(key,value);
            return this;
        }
        public String getkeyvalue(String key)
        {
            return map.get(key);
        }
    
    
    }
    

    3. EventHandler

        public interface EventHandler {
        void doHandle(EventModel event);//执行Event的具体操作函数
        List<EventType> getSupportEventType();//返回该handler对那些类型的Event是关心的,即这些EventType进入队列需要运行时,需要调用该Handler
    }
    

    4. EventProducer

    @Service
    public class EventProducer {
        @Autowired
        JedisService jedis;
        //将EVENT加入到redis队列中
        public boolean fireEvent(EventModel event)
        {
            try
            {
                String eventstring= JSONObject.toJSONString(event);
                //将event转换为JSON字符串,取出时Parse回到EventModel类型
                String key= RedisKeyUtil.eventKey;
                jedis.lpush(key,eventstring);
                //将该event加入list中
                return true;
            }
            catch(Exception e)
            {
                return false;
            }
        }
    }
    

    5.EventConsumer

    @Service
    public class EventConsumer implements InitializingBean,ApplicationContextAware{
        @Autowired
        JedisService jedis;
        
        private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class) ;
        
        private Map<EventType,List<EventHandler>> eventConsumerMap=new HashMap<EventType,List<EventHandler>>();
        //一种类型的EventType进来,就寻找这件Event所需要的Handler的列表
        
        private ApplicationContext applicationContext;
        //运行的上下文
        
        @Override
        public void afterPropertiesSet() throws Exception {
            Map<String,EventHandler> beans=applicationContext.getBeansOfType(EventHandler.class);
            //找到所有的EventHandler,注意在具体实现handler的时候一定要加上@Component,否则找不到这个handler
            if(beans!=null)
            {
                for(Map.Entry<String,EventHandler> entry:beans.entrySet())
                {
                    List<EventType> tmp=entry.getValue().getSupportEventType();
                    for(EventType a:tmp)
                    {
                        if(eventConsumerMap.containsKey(a)==false)
                        {
                            eventConsumerMap.put(a,new ArrayList<EventHandler>());
                        }
                            eventConsumerMap.get(a).add(entry.getValue());
                    }
                }
            }
            //初始化Map<EventType,List<EventHandler>>将Event类型和需要用到的EventHandler关联起来
    
            //新建线程,该线程不断地循环,从redis的list中找event,找到了就将Json字符串parse成event,然后执行它所需要的handler
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    while(true)
                    {
                        String key= RedisKeyUtil.eventKey;
                        List<String> eventlist=jedis.brpop(0,key);
                        for(String tmp:eventlist)
                        {
                            if(tmp.equals(key))
                                continue;
                            EventModel model= JSON.parseObject(tmp,EventModel.class);
                            if(!eventConsumerMap.containsKey(model.getEventype()))
                            {
                                logger.error("不能识别的事件");
                                continue;
                            }
                                for(EventHandler handler:eventConsumerMap.get(model.getEventype()))
                            {
                                handler.doHandle(model);
                            }
                        }
                    }
                }
            });
            thread.start();
    
        }
    
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext=applicationContext;
        }
    }
    

    解释一下其中的brpop指令:
    假如在指定时间内没有任何元素被弹出,则返回一个 nil 和等待时长(即第一个参数,表示等待time时间后返回nil)。 反之,返回一个含有两个元素的列表,第一个元素是被弹出元素所属的 key (所以下面遍历的时候要把Key过滤掉),第二个元素是被弹出元素的值。

    6.具体的Handler实现 LikeHandler

    @Component
    public class LikeHandler implements EventHandler {
        @Autowired
        UserService userService;
        @Autowired
        MessageService messageService;
        //给被点赞用户发送站内信
        @Override
        public void doHandle(EventModel event) {
            Message msg=new Message();
            msg.setToid(event.getEntityownerid());
            msg.setCreateddate(new Date());
            msg.setFromid(888);
            User user=userService.getuserbyid(event.getUserid());
            msg.setContent("用户"+user.getName()+"赞了您的评论 "+event.getkeyvalue("questionid"));
            msg.setHasread(0);
            msg.setConversationid(event.getEntityownerid(),888);
            messageService.insertMessage(msg);
        }
    
        @Override
        public List<EventType> getSupportEventType() {
            return Arrays.asList(EventType.LIKE);
        }
    }
    

    7. Controller中将任务放入异步队列中

    eventProducer.fireEvent(new EventModel().setEntityid(entity_id).setEntitytype(EntityType.ENTITY_COMMENT).setUserid(user.getId()).setEventype(EventType.LIKE).setkeyvalue("questionid","http://127.0.0.1:8080/question/"+String.valueOf(comment.getEntityid())).setEntityownerid(comment.getUserid()));
    

    总结:

    通过异步队列,如果用户有一个耗时且不需要同步响应的事件时,可以将事件通过事件生产者,将事件推入异步队列中,消费者线程不断从异步队列中取出事件,来执行,适用于Message Service等应用场景

    相关文章

      网友评论

          本文标题:AskMe项目 异步队列

          本文链接:https://www.haomeiwen.com/subject/zfsgtftx.html