业务场景:由于项目需求需要对接多个PLC设备,获取PLC设备数据,网络传输得数据获取后直接做业务逻辑处理,可能会对数据库造成一定压力,甚至会导致程序崩溃。
解决方案:
PLC设备数据过来,Netty创建得服务端,处理接收客户端发送得数据,先存入到Redis队列,后台再开启多个线程处理redis队列中得数据。
Service层业务代码
package com.company.netty.service.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* Created by C.H on 2019/8/23.
*/
@Component
public class RedisService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 获取对应key左侧第一个元素并删除
* @param key
* @return
*/
public Object leftPop(String key){
return stringRedisTemplate.opsForList().leftPop(key);
}
/**
* 右侧插入集合中
* @param key
* @param value
*/
public void rightPush(String key, String value){
stringRedisTemplate.opsForList().rightPush(key, value);
}
}
消费者代码
package com.company.netty.service.consumer;
import com.company.netty.common.Const;
import com.company.netty.service.redis.RedisService;
import com.company.netty.util.dataAnalysis.DataAnalysisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Created by C.H on 2019/8/24.
*/
@Slf4j
@Component
public class RedisQueueConsumer extends Thread{
@Resource
private RedisService redisService;
@Override
public void run() {
log.info("启动消费者线程处理");
while (true) {
try {
processMessage();
this.sleep(50);//防止CPU空转
} catch (InterruptedException e) {
log.error("消费处理线程异常"+ e.getMessage());
}
}
}
/**
* 消费者处理业务逻辑
*/
private void processMessage() {
Object msg = redisService.leftPop(Const.Redis_LIST_KEY);
if (msg != null) {
log.info("├ [消费客户端存放Redis消息队列]:"+ msg.toString());
//TODO处理业务逻辑,入库更新操作
}
}
}
生产者方法
/**
* 接收客户端发送的消息
*
* @param ctx ChannelHandlerContext
* @param msg 消息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("\t├ [收到客户端消息类型存放Redis队列]: {} - {}\n", msg.getClass(), msg);
//抽象方法中通过beanLoad获取service注入
PublisherService publisherService = SpringBeanLoader.getSpringBean(PublisherServiceImpl.class);
try{
publisherService.rightPush(Const.Redis_LIST_KEY, msg.toString());
}catch(Exception e){
log.error("存放消息失败" + e.getMessage());
}
}
网友评论