美文网首页
ConcurrentLinkedHashMap源码分析

ConcurrentLinkedHashMap源码分析

作者: lim快乐_无限 | 来源:发表于2021-04-14 17:59 被阅读0次

    一、简介

    ConcurrentLinkedHashMap是google 开源的线程安全的方便并发的Map, Map利用LRU缓存机制对Map中存储对象进行换入换出管理。采用两套资源控制机制,一套同步机制,使用ConcurrentMap对对象数据进行KV存储,保证多线程并发安全地调用Map资源,而对于存储对象的换入换出管理则采用异步机制,使用Queue buffer存储每次的因对象读写而产生的对象换入换出任务,当遇到读任务超过阈值或写任务时,加锁后,执行buffer中的多个任务,依次对evictionDeque进行节点调整,需要移除的数据,从map中移除。在jdk1.8后官方建议用Caffeine代替

    二、代码解析

    2.1 相关类
    **Weigher **是一个计算每条记录占用存储单元数接口,项目在类Weighers中给了许多针对不同需求的计算方式,如Byte 数组可以通过数组长度计算为存储单元个数,而就一般应用的存储对象,可以直接用SingletonWeigher,每条记录占用一个存储单元。

    @ThreadSafe
    public interface Weigher<V> {
    
      /**
       * Measures an object's weight to determine how many units of capacity that
       * the value consumes. A value must consume a minimum of one unit.
       *
       * @param value the object to weigh
       * @return the object's weight
       */
      int weightOf(V value);
    }
    

    **WeightedValue **是对Value的装饰,包含了Value占用的存储单元个数weight值,以及根据weight值计算状态

    • active 存在于map和队列中
    • retire 从map里删除但队列里还未删除
    • dead map和队列里都删除

    **Node **实现了链接表中Linked Node,便于LinkedDeque的双向索引,是Map以及evictionDeque存储对象。
    **Task **是针对存储对象LRU顺序操作的抽象类,继承自Task的有ReadTask、AddTask、UpdateTask、RemoveTask, 每一个Task有一个根据创建顺序分配的order。

    2.2 ConcurrentLinkedHashMap主要属性

    // 存储数据
    final ConcurrentMap<K, Node<K, V>> data; 
    // 实际存储大小
    final AtomicLong weightedSize;
    // 维护对象换入换出
    final LinkedDeque<Node<K, V>> evictionDeque;
    // 限制存储大小
    final AtomicLong capacity;
    // 回调
    final EvictionListener<K, V> listener;
    

    2.3 主要操作过程
    get 操作,首先从Map中读取,再添加一个addTask用于调整queue中LRU order

    @Override
      public V get(Object key) {
        final Node<K, V> node = data.get(key);
        if (node == null) {
          return null;
        }
        afterRead(node); // 处理LRU 异步调整队列顺序 注意不是立刻调整,当满足32条时才调整
        return node.getValue();
      }
    

    put 操作稍微复杂,需要判断是否只在缺少才插入(putIfAbsent),如果不存在,直接插入,如果存在, 而不是只在不存在的情况下插入,则更新。

     V put(K key, V value, boolean onlyIfAbsent) {
        checkNotNull(key);
        checkNotNull(value);
    
        final int weight = weigher.weightOf(key, value);
        final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
        final Node<K, V> node = new Node<K, V>(key, weightedValue);
    
        for (;;) {
          final Node<K, V> prior = data.putIfAbsent(node.key, node);
          if (prior == null) {
              // 处理LRU 异步添加到队列中
            afterWrite(new AddTask(node, weight));
            return null;
          } else if (onlyIfAbsent) {
            afterRead(prior);
            return prior.getValue();
          }
           // 以下代码是更新时更新权重大小
          for (;;) {
            final WeightedValue<V> oldWeightedValue = prior.get();
            if (!oldWeightedValue.isAlive()) {
              break;
            }
    
            if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
              final int weightedDifference = weight - oldWeightedValue.weight;
              if (weightedDifference == 0) {
                afterRead(prior);
              } else {
                afterWrite(new UpdateTask(prior, weightedDifference));
              }
              return oldWeightedValue.value;
            }
          }
        }
      }
    

    remove

     @Override
      public V remove(Object key) {
        final Node<K, V> node = data.remove(key);
        if (node == null) {
          return null;
        }
        // 设置节点状态 改为 retire
        makeRetired(node);
        // 处理LRU 队列删除节点
        afterWrite(new RemovalTask(node));
        return node.getValue();
      }
    

    2.4 LRU 处理过程

    void afterWrite(Runnable task) {
        // 添加写任务到缓存区
        writeBuffer.add(task);
        // 设置处理状态为必须
        drainStatus.lazySet(REQUIRED);
        // 尝试去添加并调整顺序
        tryToDrainBuffers();
        // 通知
        notifyListener();
      }
    
    void afterRead(Node<K, V> node) {
        final int bufferIndex = readBufferIndex();
        // 把此次查询节点放入缓存区 并返回待调整的数量
        final long writeCount = recordRead(bufferIndex, node);
        // 尝试调整
        drainOnReadIfNeeded(bufferIndex, writeCount);
        notifyListener();
      }
    
    void drainOnReadIfNeeded(int bufferIndex, long writeCount) {
        final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get());
        // READ_BUFFER_THRESHOLD 为32  当待调整数量大于32时进入调整
        final boolean delayable = (pending < READ_BUFFER_THRESHOLD);
        final DrainStatus status = drainStatus.get();
        if (status.shouldDrainBuffers(delayable)) {
          tryToDrainBuffers();
        }
      }
    
    void tryToDrainBuffers() {
        if (evictionLock.tryLock()) {
          try {
            drainStatus.lazySet(PROCESSING);
            drainBuffers();
          } finally {
            drainStatus.compareAndSet(PROCESSING, IDLE);
            evictionLock.unlock();
          }
        }
      }
    
      @GuardedBy("evictionLock")
      void drainBuffers() {
        // 处理读缓存区
        drainReadBuffers();
        // 处理写缓存区
        drainWriteBuffer();
      }
    
    @GuardedBy("evictionLock")
      void drainReadBuffers() {
        final int start = (int) Thread.currentThread().getId();
        final int end = start + NUMBER_OF_READ_BUFFERS;
        for (int i = start; i < end; i++) {
          drainReadBuffer(i & READ_BUFFERS_MASK);
        }
      }
    
      @GuardedBy("evictionLock")
      void drainReadBuffer(int bufferIndex) {
        final long writeCount = readBufferWriteCount[bufferIndex].get();
        for (int i = 0; i < READ_BUFFER_DRAIN_THRESHOLD; i++) {
          final int index = (int) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK);
          final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index];
          final Node<K, V> node = slot.get();
          if (node == null) {
            break;
          }
          slot.lazySet(null);
          applyRead(node);
          readBufferReadCount[bufferIndex]++;
        }
        readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount);
      }
    

    2.5 任务实现过程
    AddTask

    final class AddTask implements Runnable {
        final Node<K, V> node;
        final int weight;
    
        AddTask(Node<K, V> node, int weight) {
          this.weight = weight;
          this.node = node;
        }
    
        @Override
        @GuardedBy("evictionLock")
        public void run() {
          weightedSize.lazySet(weightedSize.get() + weight);
    
          // ignore out-of-order write operations
          if (node.get().isAlive()) {
            evictionDeque.add(node);
            // 踢出多余的
            evict();
          }
        }
      }
    
    @GuardedBy("evictionLock")
      void evict() {
        // Attempts to evict entries from the map if it exceeds the maximum
        // capacity. If the eviction fails due to a concurrent removal of the
        // victim, that removal may cancel out the addition that triggered this
        // eviction. The victim is eagerly unlinked before the removal task so
        // that if an eviction is still required then a new victim will be chosen
        // for removal.
        while (hasOverflowed()) {
          final Node<K, V> node = evictionDeque.poll();
    
          // If weighted values are used, then the pending operations will adjust
          // the size to reflect the correct weight
          if (node == null) {
            return;
          }
    
          // Notify the listener only if the entry was evicted
          if (data.remove(node.key, node)) {
            pendingNotifications.add(node);
          }
    
          makeDead(node);
        }
      }
    

    RemovalTask

    final class RemovalTask implements Runnable {
        final Node<K, V> node;
    
        RemovalTask(Node<K, V> node) {
          this.node = node;
        }
    
        @Override
        @GuardedBy("evictionLock")
        public void run() {
          // add may not have been processed yet
          evictionDeque.remove(node);
          makeDead(node);
        }
      }
    

    UpdateTask

    final class UpdateTask implements Runnable {
        final int weightDifference;
        final Node<K, V> node;
    
        public UpdateTask(Node<K, V> node, int weightDifference) {
          this.weightDifference = weightDifference;
          this.node = node;
        }
    
        @Override
        @GuardedBy("evictionLock")
        public void run() {
          weightedSize.lazySet(weightedSize.get() + weightDifference);
          applyRead(node);
          evict();
        }
      }
    

    三、示例

    1. 限制内存大小
    EntryWeigher<K, V> memoryUsageWeigher = new EntryWeigher<K, V>() {
      final MemoryMeter meter = new MemoryMeter();
    
      @Override public int weightOf(K key, V value) {
        long bytes = meter.measure(key) + meter.measure(value);
        return (int) Math.min(bytes, Integer.MAX_VALUE);
      }
    };
    ConcurrentMap<K, V> cache = new ConcurrentLinkedHashMap.Builder<K, V>()
        .maximumWeightedCapacity(1024 * 1024) // 1 MB
        .weigher(memoryUsageWeigher)
        .build();
    
    1. 限制条数
    ConcurrentLinkedHashMap<String, String> map = new ConcurrentLinkedHashMap.Builder<String, String>()
                .maximumWeightedCapacity(2).weigher(Weighers.singleton()).build();
    

    相关文章

      网友评论

          本文标题:ConcurrentLinkedHashMap源码分析

          本文链接:https://www.haomeiwen.com/subject/wpdllltx.html