美文网首页
java 实现redis消息队列

java 实现redis消息队列

作者: 等等ChEnH | 来源:发表于2019-10-30 17:32 被阅读0次

    业务场景:由于项目需求需要对接多个PLC设备,获取PLC设备数据,网络传输得数据获取后直接做业务逻辑处理,可能会对数据库造成一定压力,甚至会导致程序崩溃。

    解决方案:
    PLC设备数据过来,Netty创建得服务端,处理接收客户端发送得数据,先存入到Redis队列,后台再开启多个线程处理redis队列中得数据。

    Service层业务代码

    package com.company.netty.service.redis;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by C.H on 2019/8/23.
     */
    @Component
    public class RedisService {
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        /**
         * 获取对应key左侧第一个元素并删除
         * @param key
         * @return
         */
        public Object leftPop(String key){
            return stringRedisTemplate.opsForList().leftPop(key);
        }
    
        /**
         * 右侧插入集合中
         * @param key
         * @param value
         */
        public void rightPush(String key, String value){
            stringRedisTemplate.opsForList().rightPush(key, value);
        }
    }
    

    消费者代码

    package com.company.netty.service.consumer;
    
    import com.company.netty.common.Const;
    import com.company.netty.service.redis.RedisService;
    import com.company.netty.util.dataAnalysis.DataAnalysisUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * Created by C.H on 2019/8/24.
     */
    @Slf4j
    @Component
    public class RedisQueueConsumer extends Thread{
    
        @Resource
        private RedisService redisService;
    
        @Override
        public void run() {
            log.info("启动消费者线程处理");
            while (true) {
                try {
                    processMessage();
                    this.sleep(50);//防止CPU空转
                } catch (InterruptedException e) {
                    log.error("消费处理线程异常"+ e.getMessage());
                }
            }
        }
    
        /**
         * 消费者处理业务逻辑
         */
        private void processMessage() {
            Object msg = redisService.leftPop(Const.Redis_LIST_KEY);
            if (msg != null) {
                log.info("├ [消费客户端存放Redis消息队列]:"+ msg.toString());
                //TODO处理业务逻辑,入库更新操作
            
            }
        }
    }
    

    生产者方法

    /**
         * 接收客户端发送的消息
         *
         * @param ctx ChannelHandlerContext
         * @param msg 消息
         */
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("\t├ [收到客户端消息类型存放Redis队列]: {} - {}\n", msg.getClass(), msg);
            //抽象方法中通过beanLoad获取service注入
            PublisherService publisherService = SpringBeanLoader.getSpringBean(PublisherServiceImpl.class);
            try{
              publisherService.rightPush(Const.Redis_LIST_KEY, msg.toString());
            }catch(Exception e){
                log.error("存放消息失败" + e.getMessage());
            }
        }
    

    相关文章

      网友评论

          本文标题:java 实现redis消息队列

          本文链接:https://www.haomeiwen.com/subject/wccavctx.html