在https://www.52coding.com.cn/2019/05/05/PyTorch2/中提到了THCCachingAllocator文件,然而我找了很久在最新版本的PyTorch代码中均没有找到,应该是做了版本迭代。
新的版本统一把代码放到了CudaCachingAllocator
中。如下图:


其中CudaCachingAllocator.h
头文件代码如下:
#ifndef THC_DEVICE_ALLOCATOR_INC
#define THC_DEVICE_ALLOCATOR_INC
#include <c10/cuda/CUDAStream.h>
#include <c10/core/Allocator.h>
#include <c10/cuda/CUDAMacros.h>
#include <c10/util/Registry.h>
#include <array>
#include <mutex>
namespace c10 {
class C10_CUDA_API CUDAOutOfMemoryError : public c10::Error {
using Error::Error;
};
// Caching allocator will execute every registered callback if it unable to find
// block inside of already allocated area.
class C10_CUDA_API FreeMemoryCallback {
public:
virtual ~FreeMemoryCallback() {};
virtual bool Execute() = 0;
};
C10_DECLARE_REGISTRY(FreeCudaMemoryCallbacksRegistry, FreeMemoryCallback);
#define REGISTER_FREE_MEMORY_CALLBACK(name, ...) \
C10_REGISTER_CLASS(FreeCudaMemoryCallbacksRegistry, name, __VA_ARGS__);
namespace cuda {
namespace CUDACachingAllocator {
struct Stat {
int64_t current = 0;
int64_t peak = 0;
int64_t allocated = 0;
int64_t freed = 0;
};
enum struct StatType : uint64_t {
AGGREGATE = 0,
SMALL_POOL = 1,
LARGE_POOL = 2,
NUM_TYPES = 3 // remember to update this whenever a new stat type is added
};
typedef std::array<Stat, static_cast<size_t>(StatType::NUM_TYPES)> StatArray;
// Struct containing memory allocator summary statistics for a device.
struct DeviceStats {
// COUNT: allocations requested by client code
StatArray allocation;
// COUNT: number of allocated segments from cudaMalloc().
StatArray segment;
// COUNT: number of active memory blocks (allocated or used by stream)
StatArray active;
// COUNT: number of inactive, split memory blocks (unallocated but can't be released via cudaFree)
StatArray inactive_split;
// SUM: bytes requested by client code
StatArray allocated_bytes;
// SUM: bytes reserved by this memory allocator (both free and used)
StatArray reserved_bytes;
// SUM: bytes within active memory blocks
StatArray active_bytes;
// SUM: bytes within inactive, split memory blocks
StatArray inactive_split_bytes;
// COUNT: total number of failed calls to CUDA malloc necessitating cache flushes.
int64_t num_alloc_retries = 0;
// COUNT: total number of OOMs (i.e. failed calls to CUDA after cache flush)
int64_t num_ooms = 0;
};
// Struct containing info of an allocation block (i.e. a fractional part of a cudaMalloc)..
struct BlockInfo {
int64_t size = 0;
bool allocated = false;
bool active = false;
};
// Struct containing info of a memory segment (i.e. one contiguous cudaMalloc).
struct SegmentInfo {
int64_t device = 0;
int64_t address = 0;
int64_t total_size = 0;
int64_t allocated_size = 0;
int64_t active_size = 0;
bool is_large = false;
std::vector<BlockInfo> blocks;
};
C10_CUDA_API void* raw_alloc(size_t nbytes);
C10_CUDA_API void* raw_alloc_with_stream(size_t nbytes, cudaStream_t stream);
C10_CUDA_API void raw_delete(void* ptr);
C10_CUDA_API Allocator* get();
C10_CUDA_API void init(int device_count);
C10_CUDA_API void setMemoryFraction(double fraction, int device);
C10_CUDA_API void emptyCache();
C10_CUDA_API void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock);
C10_CUDA_API void* getBaseAllocation(void *ptr, size_t *size);
C10_CUDA_API void recordStream(const DataPtr&, CUDAStream stream);
C10_CUDA_API DeviceStats getDeviceStats(int device);
C10_CUDA_API void resetAccumulatedStats(int device);
C10_CUDA_API void resetPeakStats(int device);
C10_CUDA_API std::vector<SegmentInfo> snapshot();
C10_CUDA_API std::mutex* getFreeMutex();
C10_CUDA_API std::shared_ptr<void> getIpcDevPtr(std::string handle);
} // namespace CUDACachingAllocator
}} // namespace c10::cuda
#endif
这些函数都很不错,初始化、分配数据、free数据等均包含在内。
我们移步CudaCachingAllocator.cpp
函数中。
令人诧异的是里面有三个分配器:DeviceCachingAllocator
、THCCachingAllocator
以及CudaCachingAllocator
。然而二和三基本是同一类,因为CudaCachingAllocator
就只有简短的几行并且调用的malloc函数来源于THCCachingAllocator
。
// NB: I decided not to fold this into THCCachingAllocator, because the latter
// has a lot more methods and it wasn't altogether clear that they should
// actually be publicly exposed
struct CudaCachingAllocator : public Allocator {
DataPtr allocate(size_t size) const override {
int device;
C10_CUDA_CHECK(cudaGetDevice(&device));
void* r = nullptr;
if (forceUncachedAllocator()) {
C10_CUDA_CHECK(cudaMalloc(&r, size));
return {r, r, &uncached_delete, Device(DeviceType::CUDA, device)};
}
if (size != 0) {
caching_allocator.malloc(&r, device, size, cuda::getCurrentCUDAStream(device));
}
return {r, r, &raw_delete, Device(DeviceType::CUDA, device)};
}
DeleterFnPtr raw_deleter() const override {
return &raw_delete;
}
};
那我们就重点分析THCCachingAllocator。
其中包含如下函数:

首先是init函数:
void init(int device_count) {
int size = device_allocator.size();
if (size < device_count) {
device_allocator.resize(device_count);
for (int i = size; i < device_count; i++) {
device_allocator[i] = std::unique_ptr<DeviceCachingAllocator>(new DeviceCachingAllocator());
}
}
}
根据设备数量初始化device_allocator分配器,要保证device_allocator数量能装下设备的数量(GPU数量)。
Malloc()函数
之后是malloc函数
/** allocates a block which is safe to use from the provided stream */
void malloc(void** devPtr, int device, size_t size, cudaStream_t stream) {
TORCH_INTERNAL_ASSERT(
0 <= device && device < device_allocator.size(),
"Allocator not initialized for device ",
device,
": did you call init?");
Block* block = device_allocator[device]->malloc(device, size, stream);
add_allocated_block(block);
*devPtr = (void*)block->ptr;
}
调用device_allocator
的分配函数,并且把新建的block加入到add_allocated_block中。
调用的函数有100多行,下面慢慢分析(这个函数在DeviceCachingAllocator中):
首先我们简化代码分析:
// All public methods (except the above) acquire the allocator mutex.
// Thus, do not call a public method from another public method.
Block* malloc(int device, size_t size, cudaStream_t stream)
{
std::unique_lock<std::recursive_mutex> lock(mutex);
// process outstanding cudaEvents
process_events();
// 分配512 byte倍数的数据
size = round_size(size);
// 寻找合适的内存池进行分配
auto& pool = get_pool(size);
// 根据分配segment分配分配空间
const size_t alloc_size = get_allocation_size(size);
// 把need的数据放入params中,尤其是size、alloc_size
AllocParams params(device, size, stream, &pool, alloc_size, stats);
// 设置标志,其中stat_types包括三个标志,分别针对AGGREGATE、SMALL_POOL以及LARGE_POOL,分别有bitset进行赋值(true of false)
params.stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
params.stat_types[static_cast<size_t>(get_stat_type_for_pool(pool))] = true;
bool block_found =
// Search pool
// 从当前pool中搜索符合要求的一块空闲数据
get_free_block(params)
// Trigger callbacks and retry search
// trigger_free_memory_callbacks???不懂是做什么的
|| (trigger_free_memory_callbacks(params) && get_free_block(params))
// Attempt allocate
// 分配一个新数据块
|| alloc_block(params, false)
// Free all non-split cached blocks and retry alloc.
|| (free_cached_blocks() && alloc_block(params, true));
TORCH_INTERNAL_ASSERT((!block_found && params.err != cudaSuccess) || params.block);
if (!block_found) {
.......暂时省略
}
Block* block = params.block;
Block* remaining = nullptr;
TORCH_INTERNAL_ASSERT(block);
const bool already_split = block->is_split();
// block分裂
if (should_split(block, size)) {
......
} else if (already_split) {
// An already-split block is becoming active
......
}
block->allocated = true;
// active_blocks中存储的是正在使用的block,insert表示将新建立的block插入到这个集合中
active_blocks.insert(block);
c10::reportMemoryUsageToProfiler(
block, block->size, c10::Device(c10::DeviceType::CUDA, device));
// 以此保存内存分配次数、内存分配byte大小、正在使用的数据个数、正在使用的数据大小
update_stat_array(stats.allocation, 1, params.stat_types);
update_stat_array(stats.allocated_bytes, block->size, params.stat_types);
update_stat_array(stats.active, 1, params.stat_types);
update_stat_array(stats.active_bytes, block->size, params.stat_types);
return block;
}
该代码中最为核心的部分我认为就是block_found
的部分,其中包括了四个小部分,分别为:1 get_free_block(params)
从对应大小的Pool中搜索出>=所需size的数据,并分配。其中分配好后将调用pool.erase(it);
将pool中的it数据删掉。(pool在auto& pool = get_pool(size);
中定义了small_blocks
与large_blocks
两个内存池)
bool get_free_block(AllocParams& p) {
BlockPool& pool = *p.pool;
// 返回 >=size的集合
auto it = pool.lower_bound(&p.search_key);
// false 代表没找到合适的大小-搜索大于等于目标的第一个块
if (it == pool.end() || (*it)->stream != p.stream())
return false;
// 如果device和stream和目标相同的话就分配该块内存
p.block = *it;
// 将it从指定的pool中删除,因为已经分配给p
pool.erase(it);
return true;
}
alloc_block(params, false)
表示重新分配一个新的空间:
bool alloc_block(AllocParams& p, bool isRetry) {
size_t size = p.alloc_size;
void* ptr;
if (isRetry) {
stats.num_alloc_retries += 1;
}
// 所需分配的大小>允许分配的大小
if (set_fraction && total_allocated_memory + size > allowed_memory_maximum) {
p.err = cudaErrorMemoryAllocation;
} else {
// 调用cudaMalloc分配数据
p.err = cudaMalloc(&ptr, size);
}
if (p.err != cudaSuccess) {
if (!isRetry || p.err == cudaErrorMemoryAllocation)
cudaGetLastError(); // clear CUDA error
return false;
}
// 更新分配过的内存
total_allocated_memory += size;
// 把新分配的内存组织成一个block
p.block = new Block(p.device(), p.stream(), size, p.pool, (char*)ptr);
// 用一个全局记录,stats.segment代表碎片的数量,stats.reserved_bytes代表pool中已经分配数据的大小。碎片量+1,分配数据量+size
update_stat_array(stats.segment, 1, p.stat_types);
update_stat_array(stats.reserved_bytes, size, p.stat_types);
return (p.block != nullptr);
}
其中最重要的是调用cudaMalloc
函数,其中同样记录了许多标记位。
其中的trigger_free_memory_callbacks
以及free_cached_blocks
目前还没有太懂其作用。
如果block分配成功,且不需要分裂,则会在active_blocks
(这个active_blocks代表当前正在使用的内存block集合)中插入(insert)这个block
内存块。之后返回block
。
如果无法分配合理的空间,那么系统会调用free_cached_blocks()
函数先将cache释放掉,然后再重新分配。
函数如下:
(free_cached_blocks() && alloc_block(params, true));
bool free_cached_blocks()
{
// First ensure that all blocks that can't currently be allocated due to
// outstanding events are returned to the pool.
// 首先,确保所有当前由于未完成事件而无法分配的块都返回到池中。
synchronize_and_free_events();
// Free all non-split cached blocks
free_blocks(large_blocks);
free_blocks(small_blocks);
return true;
}
系统会首先等待当前相关事件均释放。
void synchronize_and_free_events() {
// Synchronize on outstanding events and then free associated blocks.
for (auto& e : cuda_events) {
cudaEvent_t event = e.first;
Block* block = e.second;
// 等待事件返回
C10_CUDA_CHECK(cudaEventSynchronize(event));
free_event_internal(event);
// number of outstanding CUDA events
// 返回了就减一
block->event_count--;
// 若是最后一个,则直接释放block
if (block->event_count == 0) {
free_block(block);
}
}
然后直接释放两个存储池:
void free_blocks(BlockPool& blocks)
{
// Frees all non-split blocks
auto it = blocks.begin();
while (it != blocks.end()) {
Block* block = *it;
if (!block->prev && !block->next) {
C10_CUDA_CHECK(cudaFree((void*)block->ptr));
total_allocated_memory -= block->size;
StatTypes stat_types;
stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
update_stat_array(stats.segment, -1, stat_types);
update_stat_array(stats.reserved_bytes, -block->size, stat_types);
auto cur = it;
++it;
blocks.erase(cur);
delete block;
} else {
++it;
}
}
}
此时我们看一下,如果即无法重复使用指针,也没有额外的资源分配空间,那应该怎么办呢?即上面代码隐藏的部分:
if (!block_found) {
if (params.err == cudaErrorMemoryAllocation) {
size_t device_free;
size_t device_total;
C10_CUDA_CHECK(cudaMemGetInfo(&device_free, &device_total));
std::string allowed_info;
if (set_fraction) {
allowed_info = format_size(allowed_memory_maximum) + " allowed; ";
}
stats.num_ooms += 1;
// "total capacity": total global memory on GPU
// "allowed": memory is allowed to use, which set by fraction.
// "already allocated": memory allocated by the program using the
// caching allocator
// "free": free memory as reported by the CUDA API
// "cached": memory held by the allocator but not used by the program
//
// The "allocated" amount does not include memory allocated outside
// of the caching allocator, such as memory allocated by other programs
// or memory held by the driver.
//
// The sum of "allocated" + "free" + "cached" may be less than the
// total capacity due to memory held by the driver and usage by other
// programs.
//
// Note that at this point free_cached_blocks has already returned all
// possible "cached" memory to the driver. The only remaining "cached"
// memory is split from a larger block that is partially in-use.
TORCH_CHECK_WITH(CUDAOutOfMemoryError, false,
"CUDA out of memory. Tried to allocate ", format_size(alloc_size),
" (GPU ", device, "; ",
format_size(device_total), " total capacity; ",
format_size(stats.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
" already allocated; ",
format_size(device_free), " free; ",
allowed_info,
format_size(stats.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
" reserved in total by PyTorch)");
} else {
C10_CUDA_CHECK(params.err);
}
}
该部分处理分配未成功的部分。如果走到了这里,那程序就意味着没救了,剩下的就只有崩溃,所以我们在这里解读一下该代码返回的具体含义:
别看这一堆代码有点混乱,我放个图也许经常写PyTorch代码的人就感同身受了。

而这句话的来源,就是这个地方。
这里我们需要搞清楚各个部分分别代表什么:
- 1 首先是
format_size(alloc_size)
,即上图里的564MiB。这个值就是最后一颗稻草,使得内存分配不足的稻草。
- 2
format_size(device_total)
表示GPU设备的总显存大小。该值来源于cudaMemGetInfo(&device_free, &device_total)
,而该函数能返回gpu中的free与totle显存的量。
* \param free - Returned free memory in bytes
* \param total - Returned total memory in bytes
- 3
format_size(stats.allocated_bytes[static_cast<size_t> (StatType::AGGREGATE)].current) " already allocated; ",
根据注释,这个表示使用cache分配器已经分配的数据的量。
这个值对应malloc
中的update_stat_array(stats.allocated_bytes, block->size, params.stat_types);
-
4
format_size(device_free)
为free显存的量。 -
5
format_size(stats.reserved_bytes[static_cast<size_t (StatType::AGGREGATE)].current)
, " reserved in total by PyTorch)"); 表示PyTorch中真正分配与cache后的数据,简单来说就是该值减去“已经分配的值(stats.allocated_bytes)”就是暂存在pool中的物理上已经分配但是逻辑上没有被使用的总显存大小。
// SUM: bytes reserved by this memory allocator (both free and used)
StatArray reserved_bytes;
分裂函数首先需要判断是否需要分裂:
// block分裂,针对get_free_block以及alloc_block情况(复用cache的指针以及重新分配)
if (should_split(block, size)){......}
bool should_split(const Block* block, size_t size) {
size_t remaining = block->size - size;
// 使用small池子存储 < 1MB
if (block->pool == &small_blocks) {
// 剩下的数据 > 512 byte
return remaining >= kMinBlockSize;
// 大池子 >1MB
} else if (block->pool == &large_blocks) {
// 剩余的数据 > 1 Mib
return remaining > kSmallSize;
} else {
AT_ERROR("should_split: invalid pool");
}
}
该函数判断是否需要分裂,但是我分析后感觉这个函数首先是针对于get_free_block()
,因为该函数是复用cache中的it指针;其次针对alloc_block()
函数,该函数分配的数据为p.alloc_size
,而代码中的size_t remaining = block->size - size;
换句话说就是p.alloc_size
- round_size(size);
,而这两个的关系为:alloc_size = get_allocation_size(size);
。这里插播一下get_allocation_size()
:
static size_t get_allocation_size(size_t size) {
// kSmallSize = 1MB
if (size <= kSmallSize) {
// 2MB的buffer
return kSmallBuffer;
// <10MB的数据
} else if (size < kMinLargeAlloc) {
// 分配20MB的buffer
return kLargeBuffer;
// >10MB的数据
} else {
// 2 MB的倍数
return kRoundLarge * ((size + kRoundLarge - 1) / kRoundLarge);
}
}
确实有可能存在分配冗余的情况。
// block分裂,这里我觉得针对get_free_block这种情况(复用cache的指针)
if (should_split(block, size)) {
remaining = block;
// 新建一个block,其大小为size,而不是alloc_size(因为alloc_size实际大小过大,需要分裂)
// 这里有个问题?为什么不直接分配一个合适大小的block,而要到分裂的时候再进行?哲学思想还没get
block = new Block(device, stream, size, &pool, block->ptr);
// 在原来的block链中间插入新的block,而把原来的block转化为remaining,添加到新block的后面
block->prev = remaining->prev;
if (block->prev) {
block->prev->next = block;
}
block->next = remaining;
remaining->prev = block;
// 将remaining块缩小
remaining->ptr = static_cast<char*>(remaining->ptr) + size;
remaining->size -= size;
pool.insert(remaining);
if (already_split) {
// An already-split inactive block is being shrunk by size bytes.
update_stat_array(stats.inactive_split_bytes, -block->size, params.stat_types);
} else {
// A new split inactive block is being created from a previously unsplit block,
// size remaining->size bytes.
update_stat_array(stats.inactive_split_bytes, remaining->size, params.stat_types);
update_stat_array(stats.inactive_split, 1, params.stat_types);
}
} else if (already_split) {
// An already-split block is becoming active
update_stat_array(stats.inactive_split_bytes, -block->size, params.stat_types);
update_stat_array(stats.inactive_split, -1, params.stat_types);
}
Free()函数
下面看free()函数:
同上面分配函数一致,我们需要从THCCachingAllocator
进入。
void free(void* ptr) {
if (!ptr) {
return;
}
Block* block = get_allocated_block(ptr, true /* remove */);
if (!block) {
AT_ERROR("invalid device pointer: ", ptr);
}
device_allocator[block->device]->free(block);
}
这里通过调用device_allocator[block->device]->free(block);
进入
void free(Block* block)
{
std::lock_guard<std::recursive_mutex> lock(mutex);
block->allocated = false;
c10::reportMemoryUsageToProfiler(
block, -block->size, c10::Device(c10::DeviceType::CUDA, block->device));
// 更新全局的记录
StatTypes stat_types;
stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
update_stat_array(stats.allocation, -1, {stat_types});
update_stat_array(stats.allocated_bytes, -block->size, {stat_types});
// 判断stream是不是空的
if (!block->stream_uses.empty()) {
// stream_uses不是空,则进入
insert_events(block);
} else {
// 是空的进入
free_block(block);
}
}
我们首先看free_block(block);
。
/** moves a block into a pool of cached free blocks */
void free_block(Block* block)
{
TORCH_INTERNAL_ASSERT(!block->allocated && block->event_count == 0);
size_t original_block_size = block->size;
auto& pool = *block->pool;
int64_t net_change_inactive_split_blocks = 0;
int64_t net_change_inactive_split_size = 0;
const std::array<Block*, 2> merge_candidates = {block->prev, block->next};
for (Block* merge_candidate : merge_candidates) {
// 尝试合并空余的block,尝试将block和前面以及后面的块进行合并(注意这里前后都要合并)
const int64_t subsumed_size = try_merge_blocks(block, merge_candidate, pool);
if (subsumed_size > 0) {
net_change_inactive_split_blocks -= 1;
net_change_inactive_split_size -= subsumed_size;
}
}
// active_blocks中保存正在被使用的指针
active_blocks.erase(block);
// pool中保存可用的指针
pool.insert(block);
if (block->is_split()) {
net_change_inactive_split_blocks += 1;
net_change_inactive_split_size += block->size;
}
StatTypes stat_types;
stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
update_stat_array(stats.inactive_split, net_change_inactive_split_blocks, stat_types);
update_stat_array(stats.inactive_split_bytes, net_change_inactive_split_size, stat_types);
update_stat_array(stats.active, -1, stat_types);
update_stat_array(stats.active_bytes, -original_block_size, stat_types);
}
这里我们要注意的地方是,这里并没有调用free函数,仅仅是使用active_blocks.erase(block); pool.insert(block);
做一个假的释放,并且把可用的指针放到pool中,用于malloc的时候的分配。
这里值得注意的地方是insert_events(block);
。该函数含义有点晦涩,我分析认为:
标准库提供了一个recordStream()函数来帮助在多个流上使用分配时插入正确的同步。这将确保在每个记录的流完成工作之前块不会被重用。
也就是说如果多个stream都用了这个pool,那么这个free是不能真正的执行的,需要等到所有stream的工作都完成了才能释放。
网友评论