美文网首页PyTorch阅读笔记
PyTorch-CUDA端显存池函数解读

PyTorch-CUDA端显存池函数解读

作者: CPinging | 来源:发表于2021-02-15 13:50 被阅读0次

    https://www.52coding.com.cn/2019/05/05/PyTorch2/中提到了THCCachingAllocator文件,然而我找了很久在最新版本的PyTorch代码中均没有找到,应该是做了版本迭代。

    新的版本统一把代码放到了CudaCachingAllocator中。如下图:

    其中CudaCachingAllocator.h头文件代码如下:

    #ifndef THC_DEVICE_ALLOCATOR_INC
    #define THC_DEVICE_ALLOCATOR_INC
    
    #include <c10/cuda/CUDAStream.h>
    #include <c10/core/Allocator.h>
    #include <c10/cuda/CUDAMacros.h>
    #include <c10/util/Registry.h>
    
    #include <array>
    #include <mutex>
    
    namespace c10 {
    
    class C10_CUDA_API CUDAOutOfMemoryError : public c10::Error {
      using Error::Error;
    };
    
    // Caching allocator will execute every registered callback if it unable to find
    // block inside of already allocated area.
    class C10_CUDA_API FreeMemoryCallback {
     public:
      virtual ~FreeMemoryCallback() {};
      virtual bool Execute() = 0;
    };
    
    C10_DECLARE_REGISTRY(FreeCudaMemoryCallbacksRegistry, FreeMemoryCallback);
    #define REGISTER_FREE_MEMORY_CALLBACK(name, ...) \
      C10_REGISTER_CLASS(FreeCudaMemoryCallbacksRegistry, name, __VA_ARGS__);
    
    namespace cuda {
    namespace CUDACachingAllocator {
    
    struct Stat {
      int64_t current = 0;
      int64_t peak = 0;
      int64_t allocated = 0;
      int64_t freed = 0;
    };
    
    enum struct StatType : uint64_t {
      AGGREGATE = 0,
      SMALL_POOL = 1,
      LARGE_POOL = 2,
      NUM_TYPES = 3  // remember to update this whenever a new stat type is added
    };
    
    typedef std::array<Stat, static_cast<size_t>(StatType::NUM_TYPES)> StatArray;
    
    // Struct containing memory allocator summary statistics for a device.
    struct DeviceStats {
      // COUNT: allocations requested by client code
      StatArray allocation;
      // COUNT: number of allocated segments from cudaMalloc().
      StatArray segment;
      // COUNT: number of active memory blocks (allocated or used by stream)
      StatArray active;
      // COUNT: number of inactive, split memory blocks (unallocated but can't be released via cudaFree)
      StatArray inactive_split;
    
      // SUM: bytes requested by client code
      StatArray allocated_bytes;
      // SUM: bytes reserved by this memory allocator (both free and used)
      StatArray reserved_bytes;
      // SUM: bytes within active memory blocks
      StatArray active_bytes;
      // SUM: bytes within inactive, split memory blocks
      StatArray inactive_split_bytes;
    
      // COUNT: total number of failed calls to CUDA malloc necessitating cache flushes.
      int64_t num_alloc_retries = 0;
    
      // COUNT: total number of OOMs (i.e. failed calls to CUDA after cache flush)
      int64_t num_ooms = 0;
    };
    
    // Struct containing info of an allocation block (i.e. a fractional part of a cudaMalloc)..
    struct BlockInfo {
      int64_t size = 0;
      bool allocated = false;
      bool active = false;
    };
    
    // Struct containing info of a memory segment (i.e. one contiguous cudaMalloc).
    struct SegmentInfo {
      int64_t device = 0;
      int64_t address = 0;
      int64_t total_size = 0;
      int64_t allocated_size = 0;
      int64_t active_size = 0;
      bool is_large = false;
      std::vector<BlockInfo> blocks;
    };
    
    C10_CUDA_API void* raw_alloc(size_t nbytes);
    C10_CUDA_API void* raw_alloc_with_stream(size_t nbytes, cudaStream_t stream);
    C10_CUDA_API void raw_delete(void* ptr);
    
    C10_CUDA_API Allocator* get();
    C10_CUDA_API void init(int device_count);
    C10_CUDA_API void setMemoryFraction(double fraction, int device);
    C10_CUDA_API void emptyCache();
    C10_CUDA_API void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock);
    C10_CUDA_API void* getBaseAllocation(void *ptr, size_t *size);
    C10_CUDA_API void recordStream(const DataPtr&, CUDAStream stream);
    C10_CUDA_API DeviceStats getDeviceStats(int device);
    C10_CUDA_API void resetAccumulatedStats(int device);
    C10_CUDA_API void resetPeakStats(int device);
    C10_CUDA_API std::vector<SegmentInfo> snapshot();
    
    C10_CUDA_API std::mutex* getFreeMutex();
    
    C10_CUDA_API std::shared_ptr<void> getIpcDevPtr(std::string handle);
    } // namespace CUDACachingAllocator
    
    }} // namespace c10::cuda
    
    #endif
    
    

    这些函数都很不错,初始化、分配数据、free数据等均包含在内。

    我们移步CudaCachingAllocator.cpp函数中。

    令人诧异的是里面有三个分配器:DeviceCachingAllocatorTHCCachingAllocator以及CudaCachingAllocator 。然而二和三基本是同一类,因为CudaCachingAllocator就只有简短的几行并且调用的malloc函数来源于THCCachingAllocator

    // NB: I decided not to fold this into THCCachingAllocator, because the latter
    // has a lot more methods and it wasn't altogether clear that they should
    // actually be publicly exposed
    struct CudaCachingAllocator : public Allocator {
      DataPtr allocate(size_t size) const override {
        int device;
        C10_CUDA_CHECK(cudaGetDevice(&device));
        void* r = nullptr;
        if (forceUncachedAllocator()) {
          C10_CUDA_CHECK(cudaMalloc(&r, size));
          return {r, r, &uncached_delete, Device(DeviceType::CUDA, device)};
        }
        if (size != 0) {
          caching_allocator.malloc(&r, device, size, cuda::getCurrentCUDAStream(device));
        }
        return {r, r, &raw_delete, Device(DeviceType::CUDA, device)};
      }
      DeleterFnPtr raw_deleter() const override {
        return &raw_delete;
      }
    };
    

    那我们就重点分析THCCachingAllocator

    其中包含如下函数:

    首先是init函数:

      void init(int device_count) {
        int size = device_allocator.size();
        if (size < device_count) {
          device_allocator.resize(device_count);
          for (int i = size; i < device_count; i++) {
            device_allocator[i] = std::unique_ptr<DeviceCachingAllocator>(new DeviceCachingAllocator());
          }
        }
      }
    

    根据设备数量初始化device_allocator分配器,要保证device_allocator数量能装下设备的数量(GPU数量)。

    Malloc()函数

    之后是malloc函数

      /** allocates a block which is safe to use from the provided stream */
      void malloc(void** devPtr, int device, size_t size, cudaStream_t stream) {
        TORCH_INTERNAL_ASSERT(
            0 <= device && device < device_allocator.size(),
            "Allocator not initialized for device ",
            device,
            ": did you call init?");
        Block* block = device_allocator[device]->malloc(device, size, stream);
        add_allocated_block(block);
        *devPtr = (void*)block->ptr;
      }
    

    调用device_allocator的分配函数,并且把新建的block加入到add_allocated_block中。

    调用的函数有100多行,下面慢慢分析(这个函数在DeviceCachingAllocator中):

    首先我们简化代码分析:

      // All public methods (except the above) acquire the allocator mutex.
      // Thus, do not call a public method from another public method.
    
      Block* malloc(int device, size_t size, cudaStream_t stream)
      {
        std::unique_lock<std::recursive_mutex> lock(mutex);
    
        // process outstanding cudaEvents
        process_events();
        // 分配512 byte倍数的数据
        size = round_size(size);
        // 寻找合适的内存池进行分配
        auto& pool = get_pool(size);
        // 根据分配segment分配分配空间
        const size_t alloc_size = get_allocation_size(size);
        // 把need的数据放入params中,尤其是size、alloc_size
        AllocParams params(device, size, stream, &pool, alloc_size, stats);
        // 设置标志,其中stat_types包括三个标志,分别针对AGGREGATE、SMALL_POOL以及LARGE_POOL,分别有bitset进行赋值(true of false)
        params.stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
        params.stat_types[static_cast<size_t>(get_stat_type_for_pool(pool))] = true;
    
        bool block_found =
          // Search pool
          // 从当前pool中搜索符合要求的一块空闲数据
          get_free_block(params)
          // Trigger callbacks and retry search
          // trigger_free_memory_callbacks???不懂是做什么的
          || (trigger_free_memory_callbacks(params) && get_free_block(params))
          // Attempt allocate
          // 分配一个新数据块
          || alloc_block(params, false)
          // Free all non-split cached blocks and retry alloc.
          || (free_cached_blocks() && alloc_block(params, true));
    
        TORCH_INTERNAL_ASSERT((!block_found && params.err != cudaSuccess) || params.block);
        if (!block_found) {
            .......暂时省略
        }
    
        Block* block = params.block;
        Block* remaining = nullptr;
        TORCH_INTERNAL_ASSERT(block);
    
        const bool already_split = block->is_split();
        // block分裂
        if (should_split(block, size)) {
          ......
        } else if (already_split) {
          // An already-split block is becoming active
          ......
        }
    
        block->allocated = true;
        // active_blocks中存储的是正在使用的block,insert表示将新建立的block插入到这个集合中
        active_blocks.insert(block);
    
        c10::reportMemoryUsageToProfiler(
            block, block->size, c10::Device(c10::DeviceType::CUDA, device));
        // 以此保存内存分配次数、内存分配byte大小、正在使用的数据个数、正在使用的数据大小
        update_stat_array(stats.allocation, 1, params.stat_types);
        update_stat_array(stats.allocated_bytes, block->size, params.stat_types);
        update_stat_array(stats.active, 1, params.stat_types);
        update_stat_array(stats.active_bytes, block->size, params.stat_types);
    
        return block;
      }
    

    该代码中最为核心的部分我认为就是block_found的部分,其中包括了四个小部分,分别为:1 get_free_block(params)从对应大小的Pool中搜索出>=所需size的数据,并分配。其中分配好后将调用pool.erase(it);将pool中的it数据删掉。(pool在auto& pool = get_pool(size);中定义了small_blockslarge_blocks两个内存池)

      bool get_free_block(AllocParams& p) {
        BlockPool& pool = *p.pool;
        // 返回 >=size的集合
        auto it = pool.lower_bound(&p.search_key);
        // false 代表没找到合适的大小-搜索大于等于目标的第一个块
        if (it == pool.end() || (*it)->stream != p.stream())
          return false;
        // 如果device和stream和目标相同的话就分配该块内存
        p.block = *it;
        // 将it从指定的pool中删除,因为已经分配给p
        pool.erase(it);
        return true;
      }
    

    alloc_block(params, false)表示重新分配一个新的空间:

      bool alloc_block(AllocParams& p, bool isRetry) {
        size_t size = p.alloc_size;
        void* ptr;
    
        if (isRetry) {
          stats.num_alloc_retries += 1;
        }
        // 所需分配的大小>允许分配的大小
        if (set_fraction && total_allocated_memory + size > allowed_memory_maximum) {
          p.err = cudaErrorMemoryAllocation;
        } else {
          // 调用cudaMalloc分配数据
          p.err = cudaMalloc(&ptr, size);
        }
    
        if (p.err != cudaSuccess) {
          if (!isRetry || p.err == cudaErrorMemoryAllocation)
            cudaGetLastError();  // clear CUDA error
          return false;
        }
        // 更新分配过的内存
        total_allocated_memory += size;
        // 把新分配的内存组织成一个block
        p.block = new Block(p.device(), p.stream(), size, p.pool, (char*)ptr);
        // 用一个全局记录,stats.segment代表碎片的数量,stats.reserved_bytes代表pool中已经分配数据的大小。碎片量+1,分配数据量+size
        update_stat_array(stats.segment, 1, p.stat_types);
        update_stat_array(stats.reserved_bytes, size, p.stat_types);
    
        return (p.block != nullptr);
      }
    

    其中最重要的是调用cudaMalloc函数,其中同样记录了许多标记位。

    其中的trigger_free_memory_callbacks以及free_cached_blocks目前还没有太懂其作用。

    如果block分配成功,且不需要分裂,则会在active_blocks(这个active_blocks代表当前正在使用的内存block集合)中插入(insert)这个block内存块。之后返回block

    如果无法分配合理的空间,那么系统会调用free_cached_blocks()函数先将cache释放掉,然后再重新分配。

    函数如下:

    (free_cached_blocks() && alloc_block(params, true));

      bool free_cached_blocks()
      {
        // First ensure that all blocks that can't currently be allocated due to
        // outstanding events are returned to the pool.
        // 首先,确保所有当前由于未完成事件而无法分配的块都返回到池中。
        synchronize_and_free_events();
        // Free all non-split cached blocks
        free_blocks(large_blocks);
        free_blocks(small_blocks);
        return true;
      }
    

    系统会首先等待当前相关事件均释放。

      void synchronize_and_free_events() {
        // Synchronize on outstanding events and then free associated blocks.
    
        for (auto& e : cuda_events) {
          cudaEvent_t event = e.first;
          Block* block = e.second;
          // 等待事件返回
          C10_CUDA_CHECK(cudaEventSynchronize(event));
          free_event_internal(event);
          // number of outstanding CUDA events
          // 返回了就减一
          block->event_count--;
          // 若是最后一个,则直接释放block
          if (block->event_count == 0) {
            free_block(block);
          }
        }
    

    然后直接释放两个存储池:

      void free_blocks(BlockPool& blocks)
      {
        // Frees all non-split blocks
        auto it = blocks.begin();
        while (it != blocks.end()) {
          Block* block = *it;
          if (!block->prev && !block->next) {
            C10_CUDA_CHECK(cudaFree((void*)block->ptr));
            total_allocated_memory -= block->size;
    
            StatTypes stat_types;
            stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
            stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
            update_stat_array(stats.segment, -1, stat_types);
            update_stat_array(stats.reserved_bytes, -block->size, stat_types);
            auto cur = it;
            ++it;
            blocks.erase(cur);
            delete block;
          } else {
            ++it;
          }
        }
      }
    

    此时我们看一下,如果即无法重复使用指针,也没有额外的资源分配空间,那应该怎么办呢?即上面代码隐藏的部分:

        if (!block_found) {
          if (params.err == cudaErrorMemoryAllocation) {
            size_t device_free;
            size_t device_total;
            C10_CUDA_CHECK(cudaMemGetInfo(&device_free, &device_total));
            std::string allowed_info;
    
            if (set_fraction) {
              allowed_info = format_size(allowed_memory_maximum) + " allowed; ";
            }
    
            stats.num_ooms += 1;
    
            // "total capacity": total global memory on GPU
            // "allowed": memory is allowed to use, which set by fraction.
            // "already allocated": memory allocated by the program using the
            //                      caching allocator
            // "free": free memory as reported by the CUDA API
            // "cached": memory held by the allocator but not used by the program
            //
            // The "allocated" amount  does not include memory allocated outside
            // of the caching allocator, such as memory allocated by other programs
            // or memory held by the driver.
            //
            // The sum of "allocated" + "free" + "cached" may be less than the
            // total capacity due to memory held by the driver and usage by other
            // programs.
            //
            // Note that at this point free_cached_blocks has already returned all
            // possible "cached" memory to the driver. The only remaining "cached"
            // memory is split from a larger block that is partially in-use.
            TORCH_CHECK_WITH(CUDAOutOfMemoryError, false,
              "CUDA out of memory. Tried to allocate ", format_size(alloc_size),
              " (GPU ", device, "; ",
              format_size(device_total), " total capacity; ",
              format_size(stats.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
              " already allocated; ",
              format_size(device_free), " free; ",
              allowed_info,
              format_size(stats.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
              " reserved in total by PyTorch)");
          } else {
            C10_CUDA_CHECK(params.err);
          }
        }
    
    

    该部分处理分配未成功的部分。如果走到了这里,那程序就意味着没救了,剩下的就只有崩溃,所以我们在这里解读一下该代码返回的具体含义:

    别看这一堆代码有点混乱,我放个图也许经常写PyTorch代码的人就感同身受了。

    而这句话的来源,就是这个地方。

    这里我们需要搞清楚各个部分分别代表什么:

    • 1 首先是format_size(alloc_size),即上图里的564MiB。这个值就是最后一颗稻草,使得内存分配不足的稻草。
    • 2 format_size(device_total)表示GPU设备的总显存大小。该值来源于cudaMemGetInfo(&device_free, &device_total),而该函数能返回gpu中的free与totle显存的量。
     * \param free  - Returned free memory in bytes
     * \param total - Returned total memory in bytes
    
    • 3 format_size(stats.allocated_bytes[static_cast<size_t> (StatType::AGGREGATE)].current) " already allocated; ",根据注释,这个表示使用cache分配器已经分配的数据的量。

    这个值对应malloc中的update_stat_array(stats.allocated_bytes, block->size, params.stat_types);

    • 4 format_size(device_free)为free显存的量。

    • 5 format_size(stats.reserved_bytes[static_cast<size_t (StatType::AGGREGATE)].current), " reserved in total by PyTorch)"); 表示PyTorch中真正分配与cache后的数据,简单来说就是该值减去“已经分配的值(stats.allocated_bytes)”就是暂存在pool中的物理上已经分配但是逻辑上没有被使用的总显存大小。

      // SUM: bytes reserved by this memory allocator (both free and used)
      StatArray reserved_bytes;
    

    分裂函数首先需要判断是否需要分裂:

        // block分裂,针对get_free_block以及alloc_block情况(复用cache的指针以及重新分配)
        if (should_split(block, size)){......}
    
      bool should_split(const Block* block, size_t size) {
        size_t remaining = block->size - size;
        // 使用small池子存储 < 1MB
        if (block->pool == &small_blocks) {
          // 剩下的数据 > 512 byte
          return remaining >= kMinBlockSize;
          // 大池子 >1MB
        } else if (block->pool == &large_blocks) {
          // 剩余的数据 > 1 Mib
          return remaining > kSmallSize;
        } else {
          AT_ERROR("should_split: invalid pool");
        }
      }
    

    该函数判断是否需要分裂,但是我分析后感觉这个函数首先是针对于get_free_block(),因为该函数是复用cache中的it指针;其次针对alloc_block()函数,该函数分配的数据为p.alloc_size,而代码中的size_t remaining = block->size - size;换句话说就是p.alloc_size - round_size(size);,而这两个的关系为:alloc_size = get_allocation_size(size);。这里插播一下get_allocation_size()

      static size_t get_allocation_size(size_t size) {
        // kSmallSize = 1MB
        if (size <= kSmallSize) {
          // 2MB的buffer
          return kSmallBuffer;
          // <10MB的数据
        } else if (size < kMinLargeAlloc) {
          // 分配20MB的buffer
          return kLargeBuffer;
          // >10MB的数据
        } else {
          // 2 MB的倍数
          return kRoundLarge * ((size + kRoundLarge - 1) / kRoundLarge);
        }
      }
    

    确实有可能存在分配冗余的情况。

        // block分裂,这里我觉得针对get_free_block这种情况(复用cache的指针)
        if (should_split(block, size)) {
          remaining = block;
          // 新建一个block,其大小为size,而不是alloc_size(因为alloc_size实际大小过大,需要分裂)
          // 这里有个问题?为什么不直接分配一个合适大小的block,而要到分裂的时候再进行?哲学思想还没get
          block = new Block(device, stream, size, &pool, block->ptr);
          // 在原来的block链中间插入新的block,而把原来的block转化为remaining,添加到新block的后面
          block->prev = remaining->prev;
          if (block->prev) {
            block->prev->next = block;
          }
          block->next = remaining;
    
          remaining->prev = block;
          // 将remaining块缩小
          remaining->ptr = static_cast<char*>(remaining->ptr) + size;
          remaining->size -= size;
          pool.insert(remaining);
    
          if (already_split) {
            // An already-split inactive block is being shrunk by size bytes.
            update_stat_array(stats.inactive_split_bytes, -block->size, params.stat_types);
          } else {
            // A new split inactive block is being created from a previously unsplit block,
            // size remaining->size bytes.
            update_stat_array(stats.inactive_split_bytes, remaining->size, params.stat_types);
            update_stat_array(stats.inactive_split, 1, params.stat_types);
          }
        } else if (already_split) {
          // An already-split block is becoming active
          update_stat_array(stats.inactive_split_bytes, -block->size, params.stat_types);
          update_stat_array(stats.inactive_split, -1, params.stat_types);
        }
    

    Free()函数

    下面看free()函数:

    同上面分配函数一致,我们需要从THCCachingAllocator进入。

     void free(void* ptr) {
       if (!ptr) {
         return;
       }
       Block* block = get_allocated_block(ptr, true /* remove */);
       if (!block) {
         AT_ERROR("invalid device pointer: ", ptr);
       }
       device_allocator[block->device]->free(block);
     }
    

    这里通过调用device_allocator[block->device]->free(block);进入

      void free(Block* block)
      {
        std::lock_guard<std::recursive_mutex> lock(mutex);
    
        block->allocated = false;
    
        c10::reportMemoryUsageToProfiler(
            block, -block->size, c10::Device(c10::DeviceType::CUDA, block->device));
        // 更新全局的记录
        StatTypes stat_types;
        stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
        stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
        update_stat_array(stats.allocation, -1, {stat_types});
        update_stat_array(stats.allocated_bytes, -block->size, {stat_types});
        // 判断stream是不是空的
        if (!block->stream_uses.empty()) {
          //  stream_uses不是空,则进入
          insert_events(block);
        } else {
          // 是空的进入
          free_block(block);
        }
      }
    

    我们首先看free_block(block);

      /** moves a block into a pool of cached free blocks */
      void free_block(Block* block)
      {
        TORCH_INTERNAL_ASSERT(!block->allocated && block->event_count == 0);
    
        size_t original_block_size = block->size;
    
        auto& pool = *block->pool;
        int64_t net_change_inactive_split_blocks = 0;
        int64_t net_change_inactive_split_size = 0;
    
        const std::array<Block*, 2> merge_candidates = {block->prev, block->next};
        for (Block* merge_candidate : merge_candidates) {
          // 尝试合并空余的block,尝试将block和前面以及后面的块进行合并(注意这里前后都要合并)
          const int64_t subsumed_size = try_merge_blocks(block, merge_candidate, pool);
          if (subsumed_size > 0) {
            net_change_inactive_split_blocks -= 1;
            net_change_inactive_split_size -= subsumed_size;
          }
        }
        // active_blocks中保存正在被使用的指针
        active_blocks.erase(block);
        // pool中保存可用的指针
        pool.insert(block);
    
        if (block->is_split()) {
          net_change_inactive_split_blocks += 1;
          net_change_inactive_split_size += block->size;
        }
    
        StatTypes stat_types;
        stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
        stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
        update_stat_array(stats.inactive_split, net_change_inactive_split_blocks, stat_types);
        update_stat_array(stats.inactive_split_bytes, net_change_inactive_split_size, stat_types);
        update_stat_array(stats.active, -1, stat_types);
        update_stat_array(stats.active_bytes, -original_block_size, stat_types);
      }
    

    这里我们要注意的地方是,这里并没有调用free函数,仅仅是使用active_blocks.erase(block); pool.insert(block);做一个假的释放,并且把可用的指针放到pool中,用于malloc的时候的分配。

    这里值得注意的地方是insert_events(block);。该函数含义有点晦涩,我分析认为:

    标准库提供了一个recordStream()函数来帮助在多个流上使用分配时插入正确的同步。这将确保在每个记录的流完成工作之前块不会被重用。

    也就是说如果多个stream都用了这个pool,那么这个free是不能真正的执行的,需要等到所有stream的工作都完成了才能释放。

    相关文章

      网友评论

        本文标题:PyTorch-CUDA端显存池函数解读

        本文链接:https://www.haomeiwen.com/subject/odfrxltx.html