美文网首页PalDB
PalDB 读数据过程

PalDB 读数据过程

作者: 晴天哥_王志 | 来源:发表于2018-07-07 00:03 被阅读126次

    开篇

    PalDB reader的多级缓存

     在解释PalDB的读过程之前先解释下为啥PalDB读的性能比较理想,其实PalDB可以理解为有多级缓存:

    • 一级缓存是StorageCache对象,里面是用LinkedHashMap实现的缓存
    • 二级缓存是StorageReader对象,通过mmap实现的文件到内存的映射。
    public final class ReaderImpl implements StoreReader {
      // Logger
      private final static Logger LOGGER = Logger.getLogger(ReaderImpl.class.getName());
      // Configuration
      private final Configuration config;
      // Buffer
      private final DataInputOutput dataInputOutput = new DataInputOutput();
      // Storage
      private final StorageReader storage;
      // Serialization
      private final StorageSerialization serialization;
      // Cache
      private final StorageCache cache;
      // File
      private final File file;
      // Opened?
      private boolean opened;
    
    
    
    
    private StorageCache(Configuration config) {
        cache = new LinkedHashMap(config.getInt(Configuration.CACHE_INITIAL_CAPACITY),
            config.getFloat(Configuration.CACHE_LOAD_FACTOR), true) {
          @Override
          protected boolean removeEldestEntry(Map.Entry eldest) {
            boolean res = currentWeight > maxWeight;
            if (res) {
              Object key = eldest.getKey();
              Object value = eldest.getValue();
              currentWeight -= getWeight(key) + getWeight(value) + OVERHEAD;
            }
            return res;
          }
        };
    

    PalDB 的读取过程

    PalDB宏观的读取过程

     PalDB的读取过程也非常简单明了,就是标准的带有缓存的读取过程:

    • 判断缓存是否有数据,有则直接从cache中取出返回
    • 通过storage.get从mmap的文件中读取数据
    • 将数据放到缓存后直接返回数据
    public <K> K get(Object key, K defaultValue) {
        checkOpen();
        if (key == null) {
          throw new NullPointerException("The key can't be null");
        }
        K value = cache.get(key);
        if (value == null) {
          try {
            byte[] valueBytes = storage.get(serialization.serializeKey(key));
            if (valueBytes != null) {
    
              Object v = serialization.deserialize(dataInputOutput.reset(valueBytes));
              cache.put(key, v);
              return (K) v;
            } else {
              return defaultValue;
            }
          } catch (Exception ex) {
            throw new RuntimeException(ex);
          }
        } else if (value == StorageCache.NULL_VALUE) {
          return null;
        }
        return value;
      }
    

    StorageReader的读取过程

     PalDB读取过程是一个根据key进行hash定位slot的过程,整个过程如下:

    • 对key进行hash的得到hash值,获取key下的slot的数量
    • 在key的mmap内容中定位到key在PalDB中的位置,得到对应的value的偏移量
    • 通过value的偏移量在value的mmap内容中定位到value对应的值并返回

     在读取value的mmap的过程中,会根据偏移量对mmap的文件个数求余后定位mmap的文件后进行读取。

    public byte[] get(byte[] key)
          throws IOException {
        int keyLength = key.length;
        if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
          return null;
        }
        long hash = (long) hashUtils.hash(key);
        int numSlots = slots[keyLength];
        int slotSize = slotSizes[keyLength];
        int indexOffset = indexOffsets[keyLength];
        long dataOffset = dataOffsets[keyLength];
    
        for (int probe = 0; probe < numSlots; probe++) {
          int slot = (int) ((hash + probe) % numSlots);
          indexBuffer.position(indexOffset + slot * slotSize);
          indexBuffer.get(slotBuffer, 0, slotSize);
    
          long offset = LongPacker.unpackLong(slotBuffer, keyLength);
          if (offset == 0) {
            return null;
          }
          if (isKey(slotBuffer, key)) {
            byte[] value = mMapData ? getMMapBytes(dataOffset + offset) : getDiskBytes(dataOffset + offset);
            return value;
          }
        }
        return null;
      }
    
    
    
    //Read the data at the given offset, the data can be spread over multiple data buffers
      private byte[] getMMapBytes(long offset)
          throws IOException {
        //Read the first 4 bytes to get the size of the data
        ByteBuffer buf = getDataBuffer(offset);
        int maxLen = (int) Math.min(5, dataSize - offset);
    
        int size;
        if (buf.remaining() >= maxLen) {
          //Continuous read
          int pos = buf.position();
          size = LongPacker.unpackInt(buf);
    
          // Used in case of data is spread over multiple buffers
          offset += buf.position() - pos;
        } else {
          //The size of the data is spread over multiple buffers
          int len = maxLen;
          int off = 0;
          sizeBuffer.reset();
          while (len > 0) {
            buf = getDataBuffer(offset + off);
            int count = Math.min(len, buf.remaining());
            buf.get(sizeBuffer.getBuf(), off, count);
            off += count;
            len -= count;
          }
          size = LongPacker.unpackInt(sizeBuffer);
          offset += sizeBuffer.getPos();
          buf = getDataBuffer(offset);
        }
    
        //Create output bytes
        byte[] res = new byte[size];
    
        //Check if the data is one buffer
        if (buf.remaining() >= size) {
          //Continuous read
          buf.get(res, 0, size);
        } else {
          int len = size;
          int off = 0;
          while (len > 0) {
            buf = getDataBuffer(offset);
            int count = Math.min(len, buf.remaining());
            buf.get(res, off, count);
            offset += count;
            off += count;
            len -= count;
          }
        }
    
        return res;
      }
    
    
    private ByteBuffer getDataBuffer(long index) {
        ByteBuffer buf = dataBuffers[(int) (index / segmentSize)];
        buf.position((int) (index % segmentSize));
        return buf;
      }
    

    StorageReader的初始化过程

     StorageReader的初始化过程其实就是一个写入的逆向过程,怎么写入就怎么读取,整体的顺序如下:

    • 读取metaData的PalDB信息,包括key起始位移,value起始位移
    • 将key对应的内容映射到内存的mmap当中,不超过2GB。
    • 将value对应的内容映射到内存的mmap当中,根据segment大小进行切分
    StorageReader(Configuration configuration, File file)
          throws IOException {
        path = file;
        config = configuration;
        
        //Config
        segmentSize = config.getLong(Configuration.MMAP_SEGMENT_SIZE);
    
        hashUtils = new HashUtils();
    
        // Check valid segmentSize
        if (segmentSize > Integer.MAX_VALUE) {
          throw new IllegalArgumentException(
              "The `" + Configuration.MMAP_SEGMENT_SIZE + "` setting can't be larger than 2GB");
        }
    
        //Open file and read metadata
        long createdAt = 0;
        FormatVersion formatVersion = null;
        FileInputStream inputStream = new FileInputStream(path);
        DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(inputStream));
        try {
          int ignoredBytes = -2;
    
          //Byte mark
          byte[] mark = FormatVersion.getPrefixBytes();
          int found = 0;
          while (found != mark.length) {
            byte b = dataInputStream.readByte();
            if (b == mark[found]) {
              found++;
            } else {
              ignoredBytes += found + 1;
              found = 0;
            }
          }
    
          //Version
          byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length);
          dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length);
    
          formatVersion = FormatVersion.fromBytes(versionFound);
          if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) {
            throw new RuntimeException(
                    "Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion
                            + "'");
          }
    
          //Time
          createdAt = dataInputStream.readLong();
    
          //Metadata counters
          keyCount = dataInputStream.readInt();
          keyLengthCount = dataInputStream.readInt();
          maxKeyLength = dataInputStream.readInt();
    
          //Read offset counts and keys
          indexOffsets = new int[maxKeyLength + 1];
          dataOffsets = new long[maxKeyLength + 1];
          keyCounts = new int[maxKeyLength + 1];
          slots = new int[maxKeyLength + 1];
          slotSizes = new int[maxKeyLength + 1];
    
          int maxSlotSize = 0;
          for (int i = 0; i < keyLengthCount; i++) {
            int keyLength = dataInputStream.readInt();
    
            keyCounts[keyLength] = dataInputStream.readInt();
            slots[keyLength] = dataInputStream.readInt();
            slotSizes[keyLength] = dataInputStream.readInt();
            indexOffsets[keyLength] = dataInputStream.readInt();
            dataOffsets[keyLength] = dataInputStream.readLong();
    
            maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
          }
    
          slotBuffer = new byte[maxSlotSize];
    
          //Read serializers
          try {
            Serializers.deserialize(dataInputStream, config.getSerializers());
          } catch (Exception e) {
            throw new RuntimeException();
          }
    
          //Read index and data offset
          indexOffset = dataInputStream.readInt() + ignoredBytes;
          dataOffset = dataInputStream.readLong() + ignoredBytes;
        } finally {
          //Close metadata
          dataInputStream.close();
          inputStream.close();
        }
    
        //Create Mapped file in read-only mode
        mappedFile = new RandomAccessFile(path, "r");
        channel = mappedFile.getChannel();
        long fileSize = path.length();
    
        //Create index buffer
        indexBuffer = channel.map(FileChannel.MapMode.READ_ONLY, indexOffset, dataOffset - indexOffset);
    
        //Create data buffers
        dataSize = fileSize - dataOffset;
    
        //Check if data size fits in memory map limit
        if (!config.getBoolean(Configuration.MMAP_DATA_ENABLED)) {
          //Use classical disk read
          mMapData = false;
          dataBuffers = null;
        } else {
          //Use Mmap
          mMapData = true;
    
          //Build data buffers
          int bufArraySize = (int) (dataSize / segmentSize) + ((dataSize % segmentSize != 0) ? 1 : 0);
          dataBuffers = new MappedByteBuffer[bufArraySize];
          int bufIdx = 0;
          for (long offset = 0; offset < dataSize; offset += segmentSize) {
            long remainingFileSize = dataSize - offset;
            long thisSegmentSize = Math.min(segmentSize, remainingFileSize);
            dataBuffers[bufIdx++] = channel.map(FileChannel.MapMode.READ_ONLY, dataOffset + offset, thisSegmentSize);
          }
        }
    

    相关文章

      网友评论

        本文标题:PalDB 读数据过程

        本文链接:https://www.haomeiwen.com/subject/wggvuftx.html