美文网首页
Cat消息存储

Cat消息存储

作者: spilledyear | 来源:发表于2019-12-04 23:28 被阅读0次
    1. 消息格式为 应用名-IP-小时正点数-消息递增号 MessageId
    2. 每个 应用 + IP + 整点小时 对应: 一个索引文件 和 一个数据文件
    3. 消息经过编码后,首4字节为该消息的大小,从文件中读消息的时候会用到这个特性

    写消息过程

    1. 获取MessageBlock中的MessageTree个数,进行遍历
    2. 获得每个MessageTree的index(索引递增号) 和 每个MessageTree的size(数据大小)
    3. 设置索引文件的起始位置 索引递增号*6
    4. 将该该消息所对应block在数据文件中的起始地址写到索引文件(4字节)
    5. 将该该消息在block中的偏移量写入索引文件(2字节)
    6. 将block的内容长度写入数据文件
    7. 将block的内容写入dataFile
    // MessageBlockWriter.java
    public synchronized void writeBlock(MessageBlock block) throws IOException {
        // block中消息条数
        int len = block.getBlockSize();
        // block大小
        byte[] data = block.getData();
    
        // 用于在遍历过程中记录每条消息的偏移量,遍历完成之后,blockSize等于block的大小
        int blockSize = 0;
    
        ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
        buffer.order(ByteOrder.BIG_ENDIAN);
    
        for (int i = 0; i < len; i++) {
            // 消息的递增号
            int seq = block.getIndex(i);
            // 消息的大小
            int size = block.getSize(i);
    
            // m_indexFile.seek(seq * 6L);
            // 该消息在索引文件的起始位置 递增号*6 ,表示每条消息在索引文件中占6个字节大小
            m_indexChannel.position(seq * 6L);
    
            // m_indexFile.writeInt(m_blockAddress);
            // m_indexFile.writeShort(blockSize);
            // 用于记录该消息所对应block在数据文件中的起始地址
            buffer.putInt(m_blockAddress);
            // 用于记录该消息在block中的偏移量
            buffer.putShort((short) blockSize);
            buffer.flip();
            // 写入索引文件
            m_indexChannel.write(buffer);
    
            // 计算下一条消息在该block中的偏移量
            blockSize += size;
    
            buffer.clear();
        }
    
        // m_dataFile.writeInt(data.length);
        // m_dataFile.write(data);
        buffer = ByteBuffer.allocate(4 + data.length);
        buffer.order(ByteOrder.BIG_ENDIAN);
        // 先在数据文件中用4个字节记录 block 的大小
        buffer.putInt(data.length);
        // 再将block的内容写入数据文件
        buffer.put(data);
        buffer.flip();
        m_dataChannel.write(buffer);
    
        // 更新 m_blockAddress 的值,即数据文件下一次写入时的起始位置
        m_blockAddress += data.length + 4;
    }
    

    即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】

    索引文件的存储结构为: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】

    读消息过程

    对于真正的文件存储,block在这里其实是一个抽象的概念; 如果是直接以Message为单位进行写文件,那这个 block 和 索引文件中的block偏移量 就没有什么意义了。但实际上消息是以block为单位进行写文件,一个block最大为64K,而一个block中又存在多条消息,所以每条消息在它所属的block中有一个偏移量

    1. 根据 索引递增号从索引文件读前4个字节 找到block的地址
    2. 该地址为起始地址,从数据文件中读取一个int类型数据(4个字节)作为该block的长度
    3. 根据该长度读取整个block的内容到byte数组
    4. 根据 索引递增号从索引文件读后2个字节 找到该消息在该block中的偏移地址
    5. 以偏移地址为起始地址,读取一个int类型数据(4个字节)作为该消息的大小(为什么读4字节?这是在对消息编码时决定的,首4字节表示该消息的大小)
    6. 根据偏移地址 和 上一步获取的int类型数据大小 读取Message
    // MessageBlockReader.java
    private DataInputStream createDataInputStream(byte[] buf) {
        DataInputStream in = null;
    
        try {
            in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
        } catch (IOException e) {
            try {
                in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
            } catch (IOException ioe) {
                Cat.logError(ioe);
            }
        }
        return in;
    }
    
    public byte[] readMessage(int index) throws IOException {
        int blockAddress = 0;
        int blockOffset = 0;
    
        // 索引 在索引文件的起始位置
        m_indexFile.seek(index * 6L);
    
        // 读出4字节,该值代表block在数据文件的起始位置
        blockAddress = m_indexFile.readInt();
        // 读出2字节 该值代表Message在block中的偏移量
        blockOffset = m_indexFile.readShort() & 0xFFFF;
    
        // 从数据文件的 blockAddress 地址开始访问数据
        m_dataFile.seek(blockAddress);
        // 4字节里面存的是block块的长度
        byte[] buf = new byte[m_dataFile.readInt()];
        // 从数据文件中读取整个block到buf数组
        m_dataFile.readFully(buf);
    
        DataInputStream in = createDataInputStream(buf);
    
        if (in != null) {
            try {
                // 跳到block中的偏移量
                in.skip(blockOffset);
                
                // 该值代表消息长度
                int len = in.readInt();
    
                byte[] data = new byte[len];
                
                // 从block中读取Message
                in.readFully(data);
                return data;
            } finally {
                try {
                    in.close();
                } catch (Exception e) {
                    // ignore it
                }
            }
        } else {
            return null;
        }
    }
    

    听说还有V2版本,分 以一级索引和二级索引,可我拉代码没看到呀

    相关文章

      网友评论

          本文标题:Cat消息存储

          本文链接:https://www.haomeiwen.com/subject/ohurgctx.html