美文网首页
数据写入日志延迟入库方案

数据写入日志延迟入库方案

作者: DramaKing | 来源:发表于2019-03-14 17:28 被阅读0次

如果对你有帮助, 请点击💖喜欢💖鼓励一下我这个臭弟弟

需求描述:

由于项目部署环境网络不太稳定, 可能会导致和MongoDB的连接断开, 期间的呼叫记录会丢失. 需要想办法解决这个问题.

方案思路:

记录先写入文件中, 从文件中读取记录插入到数据库中. 如果数据库连接正常, 正常入库; 如果连接断开的话... 那就等它连接恢复了再插入, 这期间的记录全部写在文件里, 不会丢失.

方案选型:

  1. 文件选择log4j日志文件, 配置为按天生成日志文件, 如果不是当天的日志文件且已经读取入库完成, 则删除节省磁盘空间
  2. 读取文件的流选择RandomAccessFile, 可以随机读写文件, 能够从上次读到的地方继续读取文件内容

方案实施:

  1. 原来代码中直接对数据库的操作[查询, 插入, 更新], 都替换为在日志中写信息的操作. 此处定义了一个实体类转换为json写入到日志文件中, 实体类如下:
    public class RecordLogObject {
        public static final int SAVE = 0; // 插入, 更新
        public static final int QUERY = 1; // 查询

        private int type; // 操作类型
        private BirdnestCallRecord callRecord; // 数据
        
        public RecordLogObject(int type, BirdnestCallRecord callRecord) {
            this.type = type;
            this.callRecord = callRecord;
        }
        
        public int getType() {
            return type;
        }
        
        public void setType(int type) {
            this.type = type;
        }
        
        public BirdnestCallRecord getCallRecord() {
            return callRecord;
        }
        
        public void setCallRecord(BirdnestCallRecord callRecord) {
            this.callRecord = callRecord;
        }
    }

之前的数据库操作是

callRecordRepository.save(callRecord);

替换之后就是

Gson gson = new Gson(); 
logger.info(gson.toJson(new RecordLogObject(SAVE, callRecord)));
  1. 设置定时任务, 定时从日志中读取数据插入数据库; 读取的偏移量记录在单独的文件中, 这样即使项目挂掉, 再次启动时也能保证不会重复读不会少读.
    定时任务代码如下:
// 每300ms读取呼叫记录日志文件进行入库
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    logReader.process();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 300, TimeUnit.MILLISECONDS);

日志文件的处理类, 代码如下:

public class CallLogReader {
    // 日志文件目录
    @Value("${callRecord.log.dir}")
    private String logDirectory;
    // 偏移文件名称
    private String offsetFileName = "offset";
    // 偏移文件路径
    private String offsetFilePath;
    // 日志文件前缀
    private String logPrefix = "log";
    // 非当日日志名称格式
    private String fileNameReg = "^" + logPrefix + ".\\d{8}$";

    @PostConstruct
    private void initLogOffsetPath() {
        offsetFilePath = logDirectory + offsetFileName;
    }

    private Logger logger = LoggerFactory.getLogger(CallLogReader.class);

    @Autowired
    private MongoTemplate mongoTemplate;

    public void process() throws IOException {
        String curName;
        long curPoint;

        // 查看日志目录下是否有日志文件
        File[] logFiles = getFiles();
        if (logFiles.length < 1) {
            // 没有日志文件直接返回
            return;
        } else if (logFiles.length == 1){
            // 直接读取偏移文件
            curName = logFiles[0].getName();
            File offsetFile = new File(offsetFilePath);
            if (!offsetFile.exists()) {
                curPoint = 0;
            } else {
                String[] offsetInfo = getOffset(offsetFilePath);
                curPoint = Long.valueOf(offsetInfo[1]);
            }
        } else {
            // 有多个日志文件, 此时一定有偏移文件, 先处理旧的, 再处理新的
            File oldestFile = getOldestFile(logFiles);
            String[] offset = getOffset(offsetFilePath);
            curPoint = Long.valueOf(offset[1]);
            if (curPoint < oldestFile.length()) {
                // 继续读
                curName = oldestFile.getName();
            } else {
                // 删除最老的日志文件, 选择次最老的日志文件从头读取
                oldestFile.delete();
                File newOldestFile = getOldestFile(getFiles());
                curName = newOldestFile.getName();
                curPoint = 0;
            }
        }

        // 从偏移处开始读取记录入库
        String curFilePath = logDirectory + curName;
        RandomAccessFile randomAccessFile = null;
        try {
            randomAccessFile = new RandomAccessFile(curFilePath, "r");
        } catch (FileNotFoundException e) {
            logger.error("读取文件异常", e);
        }

        long fileLength = randomAccessFile.length();
        if (curPoint < fileLength) {
            randomAccessFile.seek(curPoint);
        } else {
            // 如果文件已读完且不是当日文件, 则删除
            randomAccessFile.close();
            if (Pattern.matches(fileNameReg, curName)) {
                File file = new File(curFilePath);
                file.delete();
                logger.warn("删除" + curFilePath);
            }
            return;
        }

        Gson gson = new Gson();
        //得到行
        String result = randomAccessFile.readLine();
        //如果内容不为空
        if (StringUtils.isNotBlank(result)) {
            RecordLogObject recordLog = gson.fromJson(result, RecordLogObject.class);
            BirdnestCallRecord callRecord = recordLog.getCallRecord();
            try {
                // 逻辑处理
            } catch (Exception e) {
                logger.error("处理日志文件异常", e);
                randomAccessFile.close();
                return;
            }
        }

        logger.info(curName + ", " + curPoint + "---操作成功");
        // 更新偏移文件
        updateOffset(offsetFilePath, curName, randomAccessFile.getFilePointer());
        logger.info(curName + ", " + curPoint + "---更新偏移文件成功");
        randomAccessFile.close();
        logger.info(curName + ", " + curPoint + "---关闭读取文件流");
    }

    /**
     * 获取日志目录下的所有文件
     * @return
     */
    private File[] getFiles() {
        File dictFile = new File(logDirectory);
        return dictFile.listFiles(new FileFilter() {
            public boolean accept(File pathname) {
                return pathname.isFile() && pathname.getName().startsWith(logPrefix);
            }
        });
    }

    /**
     * 从File数组中查找出最早创建的文件, log永远是最新的
     * @param files
     * @return
     */
    private File getOldestFile(File[] files){
        if (files.length == 1) {
            return files[0];
        }
        Arrays.sort(files, new Comparator<File>() {
            @Override
            public int compare(File o1, File o2) {
                return o2.getName().compareTo(o1.getName());
            }
        });
        return files[files.length -2];
    }

    /**
     * 获取偏移信息
     * @param offsetFilePath
     * @return [文件名, 偏移量]
     * @throws IOException
     */
    private String[] getOffset(String offsetFilePath) throws IOException {
        FileReader fileReader = new FileReader(offsetFilePath);
        StringBuilder stringBuilder = new StringBuilder();
        char[] bytesChar = new char[100];
        fileReader.read(bytesChar);
        fileReader.close();
        for (char c : bytesChar) {
            stringBuilder.append(c);
        }
        String offset = stringBuilder.toString().trim();
        return offset.split(",");
    }

    /**
     * 更新偏移文件
     * @param offsetFilePath 偏移文件
     * @param curName 当前读取的文件名
     * @param filePointer 偏移量
     * @throws IOException
     */
    private void updateOffset(String offsetFilePath, String curName, long filePointer) throws IOException {
        FileWriter fileWriter = new FileWriter(offsetFilePath);
        fileWriter.write(curName + "," + filePointer);
        fileWriter.flush();
        fileWriter.close();
    }
}
  1. 日志配置如下:
        <RollingFile name="callLogAppener" fileName="callLog/log"
                     filePattern="callLog/log.%d{yyyyMMdd}"
                     append="true">
            <!-- 输出格式 -->
            <PatternLayout
                    pattern="%msg%n" />
            <!-- 设置策略 -->
            <Policies>
                <TimeBasedTriggeringPolicy interval="1" modulate="true" />
            </Policies>
        </RollingFile>

        <logger name="com.bjnangle.birdnest.engine.functions.CallRecordHandler">
            <AppenderRef ref="callLogAppener"/>
        </logger>        

方案效果

数据先记录到日志中;
数据库连接断开后重连可以实现记录入库;
读取完的过时日志可以正常删除.

如果对你有帮助, 请点击💖喜欢💖鼓励一下我这个臭弟弟

相关文章

网友评论

      本文标题:数据写入日志延迟入库方案

      本文链接:https://www.haomeiwen.com/subject/uuaqmqtx.html